合并3.3.0分支

This commit is contained in:
zengqiao
2023-02-24 17:13:50 +08:00
616 changed files with 32894 additions and 8421 deletions

View File

@@ -0,0 +1,31 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-ha</artifactId>
<version>${km.revision}</version>
<packaging>jar</packaging>
<parent>
<artifactId>km</artifactId>
<groupId>com.xiaojukeji.kafka</groupId>
<version>${km.revision}</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<dependencies>
<!--应用层依赖的包-->
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-common</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>com.xiaojukeji.kafka</groupId>
<artifactId>km-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
</dependencies>
</project>

View File

@@ -0,0 +1,30 @@
package com.xiaojukeji.know.streaming.km.ha.mirror.service;
import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.ha.mirror.TopicMirrorInfoVO;
import java.util.List;
public interface MirrorTopicService {
/**
* @param dtoList
* @return
*/
Result<Void> batchCreateMirrorTopic(List<MirrorTopicCreateDTO> dtoList);
/**
* @param dtoList
* @return
*/
Result<Void> batchDeleteMirrorTopic(List<MirrorTopicDeleteDTO> dtoList);
/**
* @param clusterPhyId
* @param topicName
* @return
*/
Result<List<TopicMirrorInfoVO>> getTopicsMirrorInfo(Long clusterPhyId, String topicName);
}

View File

@@ -0,0 +1,151 @@
package com.xiaojukeji.know.streaming.km.ha.mirror.service.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicDeleteDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.ha.HaActiveStandbyRelation;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.vo.ha.mirror.TopicMirrorInfoVO;
import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService;
import com.xiaojukeji.know.streaming.km.ha.mirror.service.MirrorTopicService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import kafka.zk.KafkaZkClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.TOPIC_METRIC_BYTES_IN;
import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.TOPIC_METRIC_MIRROR_FETCH_LAG;
@Service
public class MirrorTopicServiceImpl implements MirrorTopicService {
private static final ILog logger = LogFactory.getLog(MirrorTopicServiceImpl.class);
@Autowired
private ClusterPhyService clusterPhyService;
@Autowired
private TopicMetricService topicMetricService;
@Autowired
private KafkaAdminZKClient kafkaAdminZKClient;
@Autowired
private HaActiveStandbyRelationService haActiveStandbyRelationService;
@Override
public Result<Void> batchCreateMirrorTopic(List<MirrorTopicCreateDTO> dtoList) {
for (MirrorTopicCreateDTO mirrorTopicCreateDTO : dtoList) {
try {
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(mirrorTopicCreateDTO.getDestClusterPhyId());
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(mirrorTopicCreateDTO.getSourceClusterPhyId());
Properties newHaClusters = ConvertUtil.str2ObjByJson(clusterPhy.getClientProperties(), Properties.class);
newHaClusters.put("bootstrap.servers", clusterPhy.getBootstrapServers());
if(clusterPhy.getKafkaVersion().contains("2.5.0-d-")){
newHaClusters.put("didi.kafka.enable", "true");
}else{
newHaClusters.put("didi.kafka.enable", "false");
}
Properties oldHaClusters = kafkaZkClient.getEntityConfigs("ha-clusters", String.valueOf(mirrorTopicCreateDTO.getSourceClusterPhyId()));
if (!oldHaClusters.equals(newHaClusters)) {
kafkaZkClient.setOrCreateEntityConfigs("ha-clusters", String.valueOf(mirrorTopicCreateDTO.getSourceClusterPhyId()), newHaClusters);
kafkaZkClient.createConfigChangeNotification("ha-clusters/" + mirrorTopicCreateDTO.getSourceClusterPhyId());
}
boolean pathExists = kafkaZkClient.pathExists("/brokers/topics/" + mirrorTopicCreateDTO.getTopicName());
if (pathExists) {
return Result.buildFailure(String.format("目标集群已存在%s,保证数据一致性,请删除后再创建", mirrorTopicCreateDTO.getTopicName()));
}
Properties haTopics = kafkaZkClient.getEntityConfigs("ha-topics", mirrorTopicCreateDTO.getTopicName());
haTopics.put("didi.ha.remote.cluster", String.valueOf(mirrorTopicCreateDTO.getSourceClusterPhyId()));
haTopics.put("didi.ha.sync.topic.partitions.enabled", "true");
if (mirrorTopicCreateDTO.getSyncConfig()) {
haTopics.put("didi.ha.sync.topic.configs.enabled", "true");
}
kafkaZkClient.setOrCreateEntityConfigs("ha-topics", mirrorTopicCreateDTO.getTopicName(), haTopics);
kafkaZkClient.createConfigChangeNotification("ha-topics/" + mirrorTopicCreateDTO.getTopicName());
haActiveStandbyRelationService.batchReplaceTopicHA(mirrorTopicCreateDTO.getSourceClusterPhyId(), mirrorTopicCreateDTO.getDestClusterPhyId(), Collections.singletonList(mirrorTopicCreateDTO.getTopicName()));
} catch (Exception e) {
logger.error("method=batchCreateMirrorTopic||topicName:{}||errMsg=exception", mirrorTopicCreateDTO.getTopicName(), e);
return Result.buildFailure(e.getMessage());
}
}
return Result.buildSuc();
}
@Override
public Result<Void> batchDeleteMirrorTopic(List<MirrorTopicDeleteDTO> dtoList) {
for (MirrorTopicDeleteDTO mirrorTopicDeleteDTO : dtoList) {
try {
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(mirrorTopicDeleteDTO.getDestClusterPhyId());
Properties haTopics = kafkaZkClient.getEntityConfigs("ha-topics", mirrorTopicDeleteDTO.getTopicName());
if (haTopics.size() != 0) {
kafkaZkClient.setOrCreateEntityConfigs("ha-topics", mirrorTopicDeleteDTO.getTopicName(), new Properties());
kafkaZkClient.createConfigChangeNotification("ha-topics/" + mirrorTopicDeleteDTO.getTopicName());
}
haActiveStandbyRelationService.batchDeleteTopicHA(mirrorTopicDeleteDTO.getSourceClusterPhyId(), mirrorTopicDeleteDTO.getDestClusterPhyId(), Collections.singletonList(mirrorTopicDeleteDTO.getTopicName()));
} catch (Exception e) {
logger.error("method=batchDeleteMirrorTopic||topicName:{}||errMsg=exception", mirrorTopicDeleteDTO.getTopicName(), e);
return Result.buildFailure(e.getMessage());
}
}
return Result.buildSuc();
}
@Override
public Result<List<TopicMirrorInfoVO>> getTopicsMirrorInfo(Long clusterPhyId, String topicName) {
List<HaActiveStandbyRelation> haActiveStandbyRelations = haActiveStandbyRelationService.listByClusterAndType(clusterPhyId, HaResTypeEnum.MIRROR_TOPIC);
List<TopicMirrorInfoVO> topicMirrorInfoVOList = new ArrayList<>();
for (HaActiveStandbyRelation activeStandbyRelation : haActiveStandbyRelations) {
if (activeStandbyRelation.getResName().equals(topicName)) {
ClusterPhy standbyClusterPhy = clusterPhyService.getClusterByCluster(activeStandbyRelation.getStandbyClusterPhyId());
ClusterPhy activeClusterPhy = clusterPhyService.getClusterByCluster(activeStandbyRelation.getActiveClusterPhyId());
TopicMirrorInfoVO topicMirrorInfoVO = new TopicMirrorInfoVO();
topicMirrorInfoVO.setSourceClusterId(activeStandbyRelation.getActiveClusterPhyId());
topicMirrorInfoVO.setDestClusterId(activeStandbyRelation.getStandbyClusterPhyId());
topicMirrorInfoVO.setTopicName(activeStandbyRelation.getResName());
topicMirrorInfoVO.setSourceClusterName(activeClusterPhy.getName());
topicMirrorInfoVO.setDestClusterName(standbyClusterPhy.getName());
Result<List<TopicMetrics>> ret = topicMetricService.collectTopicMetricsFromKafka(activeStandbyRelation.getStandbyClusterPhyId(), activeStandbyRelation.getResName(), TOPIC_METRIC_BYTES_IN);
if (ret.hasData()) {
Double value = this.getTopicAggMetric(ret.getData(), TOPIC_METRIC_BYTES_IN);
topicMirrorInfoVO.setReplicationBytesIn(value);
}
ret = topicMetricService.collectTopicMetricsFromKafka(activeStandbyRelation.getActiveClusterPhyId(), activeStandbyRelation.getResName(), TOPIC_METRIC_BYTES_IN);
if (ret.hasData()) {
Double value = this.getTopicAggMetric(ret.getData(), TOPIC_METRIC_BYTES_IN);
topicMirrorInfoVO.setBytesIn(value);
}
ret = topicMetricService.collectTopicMetricsFromKafka(activeStandbyRelation.getStandbyClusterPhyId(), activeStandbyRelation.getResName(), TOPIC_METRIC_MIRROR_FETCH_LAG);
if (ret.hasData()) {
Float lag = ret.getData().get(0).getMetric(TOPIC_METRIC_MIRROR_FETCH_LAG);
topicMirrorInfoVO.setLag(lag == null ? 0 : lag.longValue());
}
topicMirrorInfoVOList.add(topicMirrorInfoVO);
}
}
return Result.buildSuc(topicMirrorInfoVOList);
}
private Double getTopicAggMetric(List<TopicMetrics> topicMetricsList, String metricName) {
for (TopicMetrics topicMetrics : topicMetricsList) {
if (topicMetrics.isBBrokerAgg()) {
Float value = topicMetrics.getMetric(metricName);
if (value != null) {
return value.doubleValue();
}
}
}
return Double.NaN;
}
}

View File

@@ -6,6 +6,7 @@ import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.xiaojukeji.know.streaming.km.common.annotations.enterprise.EnterpriseTesting;
import com.xiaojukeji.know.streaming.km.common.bean.dto.partition.PartitionOffsetDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec;
import com.xiaojukeji.know.streaming.km.common.bean.entity.record.RecordHeaderKS;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
@@ -35,7 +36,6 @@ import com.xiaojukeji.know.streaming.km.testing.common.bean.vo.TestProducerVO;
import com.xiaojukeji.know.streaming.km.testing.common.enums.KafkaConsumerFilterEnum;
import com.xiaojukeji.know.streaming.km.testing.common.enums.KafkaConsumerStartFromEnum;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -106,7 +106,7 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
}
//获取topic的BeginOffset
Result<Map<TopicPartition, Long>> partitionBeginOffsetMapResult = partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), OffsetSpec.earliest(), null);
Result<Map<TopicPartition, Long>> partitionBeginOffsetMapResult = partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), KSOffsetSpec.earliest());
if (partitionBeginOffsetMapResult.failed()) {
return Result.buildFromIgnoreData(partitionBeginOffsetMapResult);
}
@@ -118,7 +118,7 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
});
// 获取Topic的EndOffset
Result<Map<TopicPartition, Long>> partitionEndOffsetMapResult = partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), OffsetSpec.latest(), null);
Result<Map<TopicPartition, Long>> partitionEndOffsetMapResult = partitionService.getPartitionOffsetFromKafka(dto.getClusterId(), dto.getTopicName(), KSOffsetSpec.latest());
if (partitionEndOffsetMapResult.failed()) {
return Result.buildFromIgnoreData(partitionEndOffsetMapResult);
}
@@ -351,7 +351,7 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
private Result<List<PartitionOffsetDTO>> getConsumeStartOffset(Long clusterPhyId, String topicName, KafkaConsumerStartFromDTO startFromDTO) throws NotExistException, AdminOperateException {
// 最新位置开始消费
if (KafkaConsumerStartFromEnum.LATEST.getCode().equals(startFromDTO.getStartFromType())) {
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.latest(), null);
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, KSOffsetSpec.latest());
if (offsetMapResult.failed()) {
return Result.buildFromIgnoreData(offsetMapResult);
}
@@ -365,7 +365,7 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
// 最旧位置开始消费
if (KafkaConsumerStartFromEnum.EARLIEST.getCode().equals(startFromDTO.getStartFromType())) {
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.earliest(), null);
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, KSOffsetSpec.earliest());
if (offsetMapResult.failed()) {
return Result.buildFromIgnoreData(offsetMapResult);
}
@@ -379,7 +379,7 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
// 指定时间开始消费
if (KafkaConsumerStartFromEnum.PRECISE_TIMESTAMP.getCode().equals(startFromDTO.getStartFromType())) {
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.forTimestamp(startFromDTO.getTimestampUnitMs()), startFromDTO.getTimestampUnitMs());
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, KSOffsetSpec.forTimestamp(startFromDTO.getTimestampUnitMs()));
if (offsetMapResult.failed()) {
return Result.buildFromIgnoreData(offsetMapResult);
}
@@ -409,7 +409,7 @@ public class KafkaClientTestManagerImpl implements KafkaClientTestManager {
// 近X条数据开始消费
if (KafkaConsumerStartFromEnum.LATEST_MINUS_X_OFFSET.getCode().equals(startFromDTO.getStartFromType())) {
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, OffsetSpec.latest(), null);
Result<Map<TopicPartition, Long>> offsetMapResult = partitionService.getPartitionOffsetFromKafka(clusterPhyId, topicName, KSOffsetSpec.latest());
if (offsetMapResult.failed()) {
return Result.buildFromIgnoreData(offsetMapResult);
}