补充leader选举能力

This commit is contained in:
zengqiao
2022-08-31 17:11:12 +08:00
parent 9e3c4dc06b
commit 4114777a4e
3 changed files with 152 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
@Data
@NoArgsConstructor
public class BatchPartitionParam extends ClusterPhyParam {
private List<TopicPartition> tpList;
public BatchPartitionParam(Long clusterPhyId, List<TopicPartition> tpList) {
super(clusterPhyId);
this.tpList = tpList;
}
}

View File

@@ -0,0 +1,14 @@
package com.xiaojukeji.know.streaming.km.core.service.partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
public interface OpPartitionService {
/**
* 优先副本选举
*/
Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList);
}

View File

@@ -0,0 +1,119 @@
package com.xiaojukeji.know.streaming.km.core.service.partition.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.jdk.javaapi.CollectionConverters;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER;
/**
* @author didi
*/
@Service
public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService {
private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class);
@Autowired
private KafkaAdminClient kafkaAdminClient;
@Autowired
private KafkaAdminZKClient kafkaAdminZKClient;
public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection";
@Override
protected VersionItemTypeEnum getVersionItemType() {
return SERVICE_OP_PARTITION_LEADER;
}
@PostConstruct
private void init() {
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient);
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient);
}
@Override
public Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList) {
try {
return (Result<Void>) doVCHandler(
clusterPhyId,
PREFERRED_REPLICA_ELECTION,
new BatchPartitionParam(clusterPhyId, tpList)
);
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
/**************************************************** private method ****************************************************/
private Result<Void> preferredReplicaElectionByZKClient(VersionItemParam itemParam) {
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
try {
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId());
kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet());
return Result.buildSuc();
} catch (Exception e) {
LOGGER.error(
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception",
partitionParam.getClusterPhyId(), e
);
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
}
}
private Result<Void> preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) {
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId());
ElectLeadersResult electLeadersResult = adminClient.electLeaders(
ElectionType.PREFERRED,
new HashSet<>(partitionParam.getTpList()),
new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
electLeadersResult.all().get();
return Result.buildSuc();
} catch (Exception e) {
LOGGER.error(
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception",
partitionParam.getClusterPhyId(), e
);
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
}
}
}