diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java index 17a6c63c..3a2b11ef 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/cluster/impl/ClusterTopicsManagerImpl.java @@ -14,10 +14,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.res.ClusterPhyTop import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricMultiLinesVO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.converter.TopicVOConverter; +import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationMetricsUtil; import com.xiaojukeji.know.streaming.km.common.utils.PaginationUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import org.springframework.beans.factory.annotation.Autowired; @@ -38,6 +40,9 @@ public class ClusterTopicsManagerImpl implements ClusterTopicsManager { @Autowired private TopicMetricService topicMetricService; + @Autowired + private HaActiveStandbyRelationService haActiveStandbyRelationService; + @Override public PaginationResult getClusterPhyTopicsOverview(Long clusterPhyId, ClusterTopicsOverviewDTO dto) { // 获取集群所有的Topic信息 @@ -46,8 +51,11 @@ public class ClusterTopicsManagerImpl implements ClusterTopicsManager { // 获取集群所有Topic的指标 Map metricsMap = topicMetricService.getLatestMetricsFromCache(clusterPhyId); + // 获取HA信息 + Set haTopicNameSet = haActiveStandbyRelationService.listByClusterAndType(clusterPhyId, HaResTypeEnum.MIRROR_TOPIC).stream().map(elem -> elem.getResName()).collect(Collectors.toSet()); + // 转换成vo - List voList = TopicVOConverter.convert2ClusterPhyTopicsOverviewVOList(topicList, metricsMap); + List voList = TopicVOConverter.convert2ClusterPhyTopicsOverviewVOList(topicList, metricsMap, haTopicNameSet); // 请求分页信息 PaginationResult voPaginationResult = this.pagingTopicInLocal(voList, dto); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/ha/mirror/MirrorTopicCreateDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/ha/mirror/MirrorTopicCreateDTO.java new file mode 100644 index 00000000..58182670 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/ha/mirror/MirrorTopicCreateDTO.java @@ -0,0 +1,38 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +/** + * @author zengqiao + * @date 20/4/23 + */ +@Data +@ApiModel(description="Topic镜像信息") +public class MirrorTopicCreateDTO extends BaseDTO { + @Min(value = 0, message = "sourceClusterPhyId不允许为空,且最小值为0") + @ApiModelProperty(value = "源集群ID", example = "3") + private Long sourceClusterPhyId; + + @Min(value = 0, message = "destClusterPhyId不允许为空,且最小值为0") + @ApiModelProperty(value = "目标集群ID", example = "3") + private Long destClusterPhyId; + + @NotBlank(message = "topicName不允许为空串") + @ApiModelProperty(value = "Topic名称", example = "mirrorTopic") + private String topicName; + + @NotNull(message = "syncData不允许为空") + @ApiModelProperty(value = "同步数据", example = "true") + private Boolean syncData; + + @NotNull(message = "syncConfig不允许为空") + @ApiModelProperty(value = "同步配置", example = "false") + private Boolean syncConfig; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/ha/mirror/MirrorTopicDeleteDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/ha/mirror/MirrorTopicDeleteDTO.java new file mode 100644 index 00000000..8b7d0095 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/ha/mirror/MirrorTopicDeleteDTO.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; + +/** + * @author zengqiao + * @date 20/4/23 + */ +@Data +@ApiModel(description="Topic镜像信息") +public class MirrorTopicDeleteDTO extends BaseDTO { + @Min(value = 0, message = "sourceClusterPhyId不允许为空,且最小值为0") + @ApiModelProperty(value = "源集群ID", example = "3") + private Long sourceClusterPhyId; + + @Min(value = 0, message = "destClusterPhyId不允许为空,且最小值为0") + @ApiModelProperty(value = "目标集群ID", example = "3") + private Long destClusterPhyId; + + @NotBlank(message = "topicName不允许为空串") + @ApiModelProperty(value = "Topic名称", example = "mirrorTopic") + private String topicName; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/ha/HaActiveStandbyRelation.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/ha/HaActiveStandbyRelation.java new file mode 100644 index 00000000..db306641 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/ha/HaActiveStandbyRelation.java @@ -0,0 +1,23 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.ha; + +import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; +import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum; +import lombok.Data; + +@Data +public class HaActiveStandbyRelation extends BasePO { + private Long activeClusterPhyId; + + private Long standbyClusterPhyId; + + /** + * 资源名称 + */ + private String resName; + + /** + * 资源类型,0:集群,1:镜像Topic,2:主备Topic + * @see HaResTypeEnum + */ + private Integer resType; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/ha/HaActiveStandbyRelationPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/ha/HaActiveStandbyRelationPO.java new file mode 100644 index 00000000..e019a4f0 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/ha/HaActiveStandbyRelationPO.java @@ -0,0 +1,33 @@ +package com.xiaojukeji.know.streaming.km.common.bean.po.ha; + +import com.baomidou.mybatisplus.annotation.TableName; +import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@TableName(Constant.MYSQL_HA_TABLE_NAME_PREFIX + "active_standby_relation") +public class HaActiveStandbyRelationPO extends BasePO { + private Long activeClusterPhyId; + + private Long standbyClusterPhyId; + + /** + * 资源名称 + */ + private String resName; + + /** + * 资源类型,0:集群,1:镜像Topic,2:主备Topic + */ + private Integer resType; + + public HaActiveStandbyRelationPO(Long activeClusterPhyId, Long standbyClusterPhyId, String resName, Integer resType) { + this.activeClusterPhyId = activeClusterPhyId; + this.standbyClusterPhyId = standbyClusterPhyId; + this.resName = resName; + this.resType = resType; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterPhyTopicsOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterPhyTopicsOverviewVO.java index a767b9f2..10258bf1 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterPhyTopicsOverviewVO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/res/ClusterPhyTopicsOverviewVO.java @@ -32,6 +32,9 @@ public class ClusterPhyTopicsOverviewVO extends BaseTimeVO { @ApiModelProperty(value = "副本数", example = "2") private Integer replicaNum; + @ApiModelProperty(value = "处于镜像复制中", example = "true") + private Boolean inMirror; + @ApiModelProperty(value = "多个指标的当前值, 包括健康分/LogSize等") private BaseMetrics latestMetrics; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/ha/mirror/TopicMirrorInfoVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/ha/mirror/TopicMirrorInfoVO.java new file mode 100644 index 00000000..b1f748c0 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/ha/mirror/TopicMirrorInfoVO.java @@ -0,0 +1,37 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.ha.mirror; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * @author zengqiao + * @date 20/4/29 + */ +@Data +@ApiModel(description="Topic复制信息") +public class TopicMirrorInfoVO { + @ApiModelProperty(value="源集群ID", example = "1") + private Long sourceClusterId; + + @ApiModelProperty(value="源集群名称", example = "know-streaming-1") + private String sourceClusterName; + + @ApiModelProperty(value="目标集群ID", example = "2") + private Long destClusterId; + + @ApiModelProperty(value="目标集群名称", example = "know-streaming-2") + private String destClusterName; + + @ApiModelProperty(value="Topic名称", example = "know-streaming") + private String topicName; + + @ApiModelProperty(value="写入速率(bytes/s)", example = "100") + private Double bytesIn; + + @ApiModelProperty(value="复制速率(bytes/s)", example = "100") + private Double replicationBytesIn; + + @ApiModelProperty(value="延迟消息数", example = "100") + private Long lag; +} \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java index c8f4075b..5ba73baa 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/Constant.java @@ -46,6 +46,7 @@ public class Constant { public static final String MYSQL_TABLE_NAME_PREFIX = "ks_km_"; public static final String MYSQL_KC_TABLE_NAME_PREFIX = "ks_kc_"; + public static final String MYSQL_HA_TABLE_NAME_PREFIX = "ks_ha_"; public static final String SWAGGER_API_TAG_PREFIX = "KS-KM-"; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java index 0240fde1..14afdd9f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicVOConverter.java @@ -77,7 +77,7 @@ public class TopicVOConverter { return vo; } - public static List convert2ClusterPhyTopicsOverviewVOList(List topicList, Map metricsMap) { + public static List convert2ClusterPhyTopicsOverviewVOList(List topicList, Map metricsMap, Set haTopicNameSet) { List voList = new ArrayList<>(); for (Topic topic: topicList) { ClusterPhyTopicsOverviewVO vo = new ClusterPhyTopicsOverviewVO(); @@ -92,6 +92,7 @@ public class TopicVOConverter { vo.setLatestMetrics(metricsMap.getOrDefault(topic.getTopicName(), new TopicMetrics(topic.getTopicName(), topic.getClusterPhyId()))); + vo.setInMirror(haTopicNameSet.contains(topic.getTopicName())); voList.add(vo); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/ha/HaResTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/ha/HaResTypeEnum.java new file mode 100644 index 00000000..1821f7ab --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/ha/HaResTypeEnum.java @@ -0,0 +1,25 @@ +package com.xiaojukeji.know.streaming.km.common.enums.ha; + +import lombok.Getter; + +/** + * @author zengqiao + * @date 20/7/28 + */ +@Getter +public enum HaResTypeEnum { + CLUSTER(0, "Cluster"), + + MIRROR_TOPIC(1, "镜像Topic"), + + ; + + private final int code; + + private final String msg; + + HaResTypeEnum(int code, String msg) { + this.code = code; + this.msg = msg; + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java index b91e27c1..d5ae89e3 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/DatabaseDataFlusher.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.flusher; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.ha.HaActiveStandbyRelation; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.ClusterMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.partition.Partition; @@ -13,6 +14,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.persistence.cache.DataBaseDataLocalCache; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterMetricService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService; import com.xiaojukeji.know.streaming.km.core.service.health.checkresult.HealthCheckResultService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; @@ -50,6 +52,9 @@ public class DatabaseDataFlusher { @Autowired private PartitionService partitionService; + @Autowired + private HaActiveStandbyRelationService haActiveStandbyRelationService; + @PostConstruct public void init() { this.flushPartitionsCache(); @@ -59,6 +64,8 @@ public class DatabaseDataFlusher { this.flushTopicLatestMetricsCache(); this.flushHealthCheckResultCache(); + + this.flushHaTopicCache(); } @Scheduled(cron="0 0/1 * * * ?") @@ -159,4 +166,12 @@ public class DatabaseDataFlusher { }); } } + + @Scheduled(cron="0 0/1 * * * ?") + public void flushHaTopicCache() { + List haTopicList = haActiveStandbyRelationService.listAllTopicHa(); + for (HaActiveStandbyRelation topic : haTopicList) { + DataBaseDataLocalCache.putHaTopic(topic.getStandbyClusterPhyId(), topic.getResName()); + } + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/ha/HaActiveStandbyRelationService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/ha/HaActiveStandbyRelationService.java new file mode 100644 index 00000000..934125de --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/ha/HaActiveStandbyRelationService.java @@ -0,0 +1,25 @@ +package com.xiaojukeji.know.streaming.km.core.service.ha; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.ha.HaActiveStandbyRelation; +import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum; + +import java.util.List; + +public interface HaActiveStandbyRelationService { + /** + * 新增或者变更,支持幂等 + */ + void batchReplaceTopicHA(Long activeClusterPhyId, Long standbyClusterPhyId, List topicNameList); + + /** + * 删除 + */ + void batchDeleteTopicHA(Long activeClusterPhyId, Long standbyClusterPhyId, List topicNameList); + + /** + * 按照集群ID查询 + */ + List listByClusterAndType(Long firstClusterId, HaResTypeEnum haResTypeEnum); + + List listAllTopicHa(); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/ha/impl/HaActiveStandbyRelationServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/ha/impl/HaActiveStandbyRelationServiceImpl.java new file mode 100644 index 00000000..7c194edd --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/ha/impl/HaActiveStandbyRelationServiceImpl.java @@ -0,0 +1,106 @@ +package com.xiaojukeji.know.streaming.km.core.service.ha.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.xiaojukeji.know.streaming.km.common.bean.entity.ha.HaActiveStandbyRelation; +import com.xiaojukeji.know.streaming.km.common.bean.po.ha.HaActiveStandbyRelationPO; +import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService; +import com.xiaojukeji.know.streaming.km.persistence.mysql.ha.HaActiveStandbyRelationDAO; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.stereotype.Service; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum.MIRROR_TOPIC; + +@Service +public class HaActiveStandbyRelationServiceImpl implements HaActiveStandbyRelationService { + @Autowired + private HaActiveStandbyRelationDAO haActiveStandbyRelationDAO; + + @Override + public void batchReplaceTopicHA(Long activeClusterPhyId, Long standbyClusterPhyId, List topicNameList) { + Map poMap = this.listPOs(activeClusterPhyId, standbyClusterPhyId, MIRROR_TOPIC) + .stream() + .collect(Collectors.toMap(HaActiveStandbyRelationPO::getResName, Function.identity())); + for (String topicName: topicNameList) { + HaActiveStandbyRelationPO oldPO = poMap.get(topicName); + if (oldPO != null) { + continue; + } + + try { + haActiveStandbyRelationDAO.insert(new HaActiveStandbyRelationPO(activeClusterPhyId, standbyClusterPhyId, topicName, MIRROR_TOPIC.getCode())); + } catch (DuplicateKeyException dke) { + // ignore + } + } + } + + @Override + public void batchDeleteTopicHA(Long activeClusterPhyId, Long standbyClusterPhyId, List topicNameList) { + Map poMap = this.listPOs(activeClusterPhyId, standbyClusterPhyId, MIRROR_TOPIC) + .stream() + .collect(Collectors.toMap(HaActiveStandbyRelationPO::getResName, Function.identity())); + for (String topicName: topicNameList) { + HaActiveStandbyRelationPO oldPO = poMap.get(topicName); + if (oldPO == null) { + continue; + } + + haActiveStandbyRelationDAO.deleteById(oldPO.getId()); + } + } + + @Override + public List listByClusterAndType(Long firstClusterId, HaResTypeEnum haResTypeEnum) { + // 查询HA列表 + List poList = this.listPOs(firstClusterId, haResTypeEnum); + if (ValidateUtils.isNull(poList)) { + return new ArrayList<>(); + } + + return ConvertUtil.list2List(poList, HaActiveStandbyRelation.class); + } + + @Override + public List listAllTopicHa() { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(HaActiveStandbyRelationPO::getResType, MIRROR_TOPIC.getCode()); + List poList = haActiveStandbyRelationDAO.selectList(lambdaQueryWrapper); + if (ValidateUtils.isNull(poList)) { + return new ArrayList<>(); + } + + return ConvertUtil.list2List(poList, HaActiveStandbyRelation.class); + } + + private List listPOs(Long firstClusterId, HaResTypeEnum haResTypeEnum) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(HaActiveStandbyRelationPO::getResType, haResTypeEnum.getCode()); + lambdaQueryWrapper.and(lambda -> + lambda.eq(HaActiveStandbyRelationPO::getActiveClusterPhyId, firstClusterId).or().eq(HaActiveStandbyRelationPO::getStandbyClusterPhyId, firstClusterId) + ); + + // 查询HA列表 + return haActiveStandbyRelationDAO.selectList(lambdaQueryWrapper); + } + + private List listPOs(Long activeClusterPhyId, Long standbyClusterPhyId, HaResTypeEnum haResTypeEnum) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(HaActiveStandbyRelationPO::getResType, haResTypeEnum.getCode()); + lambdaQueryWrapper.eq(HaActiveStandbyRelationPO::getActiveClusterPhyId, activeClusterPhyId); + lambdaQueryWrapper.eq(HaActiveStandbyRelationPO::getStandbyClusterPhyId, standbyClusterPhyId); + + + // 查询HA列表 + return haActiveStandbyRelationDAO.selectList(lambdaQueryWrapper); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java index 5a7b0f76..466f7a2f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.topic.impl; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.ha.HaActiveStandbyRelation; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; @@ -12,11 +13,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.converter.TopicConverter; +import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; @@ -70,6 +73,9 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement @Autowired private KafkaZKDAO kafkaZKDAO; + @Autowired + private HaActiveStandbyRelationService haActiveStandbyRelationService; + @Override protected VersionItemTypeEnum getVersionItemType() { return SERVICE_OP_TOPIC; @@ -138,6 +144,25 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement // 删除DB中的Topic数据 topicService.deleteTopicInDB(param.getClusterPhyId(), param.getTopicName()); + //解除高可用Topic关联 + List haActiveStandbyRelations = haActiveStandbyRelationService.listByClusterAndType(param.getClusterPhyId(), HaResTypeEnum.MIRROR_TOPIC); + for (HaActiveStandbyRelation activeStandbyRelation : haActiveStandbyRelations) { + if (activeStandbyRelation.getResName().equals(param.getTopicName())) { + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(activeStandbyRelation.getStandbyClusterPhyId()); + Properties haTopics = kafkaZkClient.getEntityConfigs("ha-topics", activeStandbyRelation.getResName()); + if (haTopics.size() != 0) { + kafkaZkClient.setOrCreateEntityConfigs("ha-topics", activeStandbyRelation.getResName(), new Properties()); + kafkaZkClient.createConfigChangeNotification("ha-topics/" + activeStandbyRelation.getResName()); + } + haActiveStandbyRelationService.batchDeleteTopicHA(activeStandbyRelation.getActiveClusterPhyId(), activeStandbyRelation.getStandbyClusterPhyId(), Collections.singletonList(activeStandbyRelation.getResName())); + } catch (Exception e) { + log.error("method=deleteTopic||topicName:{}||errMsg=exception", activeStandbyRelation.getResName(), e); + return Result.buildFailure(e.getMessage()); + } + } + } + // 记录操作 OplogDTO oplogDTO = new OplogDTO(operator, OperationEnum.DELETE.getDesc(), diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java index bc275821..d42fb5db 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicMetricServiceImpl.java @@ -61,7 +61,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe public static final String TOPIC_METHOD_GET_METRIC_FROM_KAFKA_BY_TOTAL_PARTITION_OF_BROKER_JMX = "getMetricFromKafkaByTotalPartitionOfBrokerJmx"; public static final String TOPIC_METHOD_GET_MESSAGES = "getMessages"; public static final String TOPIC_METHOD_GET_REPLICAS_COUNT = "getReplicasCount"; - + public static final String TOPIC_METHOD_GET_TOPIC_MIRROR_FETCH_LAG = "getTopicMirrorFetchLag"; @Autowired private HealthStateService healthStateService; @@ -98,6 +98,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe registerVCHandler( TOPIC_METHOD_GET_METRIC_FROM_KAFKA_BY_TOTAL_PARTITION_OF_BROKER_JMX, this::getMetricFromKafkaByTotalPartitionOfBrokerJmx ); registerVCHandler( TOPIC_METHOD_GET_REPLICAS_COUNT, this::getReplicasCount); registerVCHandler( TOPIC_METHOD_GET_MESSAGES, this::getMessages); + registerVCHandler( TOPIC_METHOD_GET_TOPIC_MIRROR_FETCH_LAG, this::getTopicMirrorFetchLag); } @Override @@ -502,4 +503,41 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe return aliveBrokerList.stream().filter(elem -> topic.getBrokerIdSet().contains(elem.getBrokerId())).collect(Collectors.toList()); } + + private Result> getTopicMirrorFetchLag(VersionItemParam param) { + TopicMetricParam topicMetricParam = (TopicMetricParam)param; + + String topic = topicMetricParam.getTopic(); + Long clusterId = topicMetricParam.getClusterId(); + String metric = topicMetricParam.getMetric(); + + VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric); + if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);} + + if (!DataBaseDataLocalCache.isHaTopic(clusterId, topic)) { + return Result.buildFailure(NOT_EXIST); + } + + List brokers = this.listAliveBrokersByTopic(clusterId, topic); + if(CollectionUtils.isEmpty(brokers)){return Result.buildFailure(BROKER_NOT_EXIST);} + + Float sumLag = 0f; + for (Broker broker : brokers) { + JmxConnectorWrap jmxConnectorWrap = kafkaJMXClient.getClientWithCheck(clusterId, broker.getBrokerId()); + try { + String jmxObjectName = String.format(jmxInfo.getJmxObjectName(), topic); + Set objectNameSet = jmxConnectorWrap.queryNames(new ObjectName(jmxObjectName), null); + for (ObjectName name : objectNameSet) { + Object attribute = jmxConnectorWrap.getAttribute(name, jmxInfo.getJmxAttribute()); + sumLag += Float.valueOf(attribute.toString()); + } + } catch (Exception e) { + LOGGER.error("method=getTopicMirrorFetchLag||cluster={}||brokerId={}||topic={}||metrics={}||jmx={}||msg={}", + clusterId, broker.getBrokerId(), topic, metric, jmxInfo.getJmxObjectName(), e.getClass().getName()); + } + } + TopicMetrics topicMetric = new TopicMetrics(topic, clusterId, true); + topicMetric.putMetric(metric, sumLag); + return Result.buildSuc(Arrays.asList(topicMetric)); + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java index 769bd225..3f4c0a68 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java @@ -33,6 +33,9 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { private static final String FE_CREATE_TOPIC_CLEANUP_POLICY = "FECreateTopicCleanupPolicy"; + private static final String FE_HA_CREATE_MIRROR_TOPIC = "FEHaCreateMirrorTopic"; + private static final String FE_HA_DELETE_MIRROR_TOPIC = "FEHaDeleteMirrorTopic"; + public FrontEndControlVersionItems(){} @Override @@ -80,6 +83,12 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { itemList.add(buildItem().minVersion(VersionEnum.V_0_10_1_0).maxVersion(VersionEnum.V_MAX) .name(FE_CREATE_TOPIC_CLEANUP_POLICY).desc("Topic-创建Topic-Cleanup-Policy")); + // HA-Topic复制 + itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX) + .name(FE_HA_CREATE_MIRROR_TOPIC).desc("HA-创建Topic复制")); + itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX) + .name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制")); + return itemList; } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/TopicMetricVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/TopicMetricVersionItems.java index 86296a5d..ed192561 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/TopicMetricVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/metrics/kafka/TopicMetricVersionItems.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka; import com.xiaojukeji.know.streaming.km.common.bean.entity.version.VersionMetricControlItem; import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.BaseMetricVersionMetric; import org.springframework.stereotype.Component; @@ -36,6 +37,8 @@ public class TopicMetricVersionItems extends BaseMetricVersionMetric { public static final String TOPIC_METRIC_BYTES_OUT_MIN_15 = "BytesOut_min_15"; public static final String TOPIC_METRIC_LOG_SIZE = "LogSize"; public static final String TOPIC_METRIC_UNDER_REPLICA_PARTITIONS = "PartitionURP"; + + public static final String TOPIC_METRIC_MIRROR_FETCH_LAG = "MirrorFetchLag"; public static final String TOPIC_METRIC_COLLECT_COST_TIME = Constant.COLLECT_METRICS_COST_TIME_METRICS_NAME; @Override @@ -148,6 +151,11 @@ public class TopicMetricVersionItems extends BaseMetricVersionMetric { .name(TOPIC_METRIC_COLLECT_COST_TIME).unit("秒").desc("采集Topic指标的耗时").category(CATEGORY_PERFORMANCE) .extendMethod(TOPIC_METHOD_DO_NOTHING)); + itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX) + .name(TOPIC_METRIC_MIRROR_FETCH_LAG).unit("条").desc("Topic复制延迟消息数").category(CATEGORY_FLOW) + .extend(buildJMXMethodExtend(TOPIC_METHOD_GET_TOPIC_MIRROR_FETCH_LAG) + .jmxObjectName(JMX_SERVER_TOPIC_MIRROR).jmxAttribute(VALUE))); + return itemList; } } diff --git a/km-enterprise/km-ha/pom.xml b/km-enterprise/km-ha/pom.xml new file mode 100644 index 00000000..88a2e1e0 --- /dev/null +++ b/km-enterprise/km-ha/pom.xml @@ -0,0 +1,31 @@ + + + 4.0.0 + com.xiaojukeji.kafka + km-ha + ${km.revision} + jar + + + km + com.xiaojukeji.kafka + ${km.revision} + ../../pom.xml + + + + + + com.xiaojukeji.kafka + km-common + ${project.parent.version} + + + com.xiaojukeji.kafka + km-core + ${project.parent.version} + + + \ No newline at end of file diff --git a/km-enterprise/km-ha/src/main/java/com/xiaojukeji/know/streaming/km/ha/mirror/service/MirrorTopicService.java b/km-enterprise/km-ha/src/main/java/com/xiaojukeji/know/streaming/km/ha/mirror/service/MirrorTopicService.java new file mode 100644 index 00000000..96830b0f --- /dev/null +++ b/km-enterprise/km-ha/src/main/java/com/xiaojukeji/know/streaming/km/ha/mirror/service/MirrorTopicService.java @@ -0,0 +1,30 @@ +package com.xiaojukeji.know.streaming.km.ha.mirror.service; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicDeleteDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.ha.mirror.TopicMirrorInfoVO; + +import java.util.List; + +public interface MirrorTopicService { + + /** + * @param dtoList + * @return + */ + Result batchCreateMirrorTopic(List dtoList); + + /** + * @param dtoList + * @return + */ + Result batchDeleteMirrorTopic(List dtoList); + + /** + * @param clusterPhyId + * @param topicName + * @return + */ + Result> getTopicsMirrorInfo(Long clusterPhyId, String topicName); +} diff --git a/km-enterprise/km-ha/src/main/java/com/xiaojukeji/know/streaming/km/ha/mirror/service/impl/MirrorTopicServiceImpl.java b/km-enterprise/km-ha/src/main/java/com/xiaojukeji/know/streaming/km/ha/mirror/service/impl/MirrorTopicServiceImpl.java new file mode 100644 index 00000000..15770ef7 --- /dev/null +++ b/km-enterprise/km-ha/src/main/java/com/xiaojukeji/know/streaming/km/ha/mirror/service/impl/MirrorTopicServiceImpl.java @@ -0,0 +1,151 @@ +package com.xiaojukeji.know.streaming.km.ha.mirror.service.impl; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicDeleteDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.ha.HaActiveStandbyRelation; +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.TopicMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.ha.mirror.TopicMirrorInfoVO; +import com.xiaojukeji.know.streaming.km.common.enums.ha.HaResTypeEnum; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; +import com.xiaojukeji.know.streaming.km.core.service.ha.HaActiveStandbyRelationService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicMetricService; +import com.xiaojukeji.know.streaming.km.ha.mirror.service.MirrorTopicService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; +import kafka.zk.KafkaZkClient; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.*; + +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.TOPIC_METRIC_BYTES_IN; +import static com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.TopicMetricVersionItems.TOPIC_METRIC_MIRROR_FETCH_LAG; + +@Service +public class MirrorTopicServiceImpl implements MirrorTopicService { + private static final ILog logger = LogFactory.getLog(MirrorTopicServiceImpl.class); + + @Autowired + private ClusterPhyService clusterPhyService; + + @Autowired + private TopicMetricService topicMetricService; + + @Autowired + private KafkaAdminZKClient kafkaAdminZKClient; + + @Autowired + private HaActiveStandbyRelationService haActiveStandbyRelationService; + + @Override + public Result batchCreateMirrorTopic(List dtoList) { + for (MirrorTopicCreateDTO mirrorTopicCreateDTO : dtoList) { + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(mirrorTopicCreateDTO.getDestClusterPhyId()); + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(mirrorTopicCreateDTO.getSourceClusterPhyId()); + Properties newHaClusters = ConvertUtil.str2ObjByJson(clusterPhy.getClientProperties(), Properties.class); + newHaClusters.put("bootstrap.servers", clusterPhy.getBootstrapServers()); + if(clusterPhy.getKafkaVersion().contains("2.5.0-d-")){ + newHaClusters.put("didi.kafka.enable", "true"); + }else{ + newHaClusters.put("didi.kafka.enable", "false"); + } + Properties oldHaClusters = kafkaZkClient.getEntityConfigs("ha-clusters", String.valueOf(mirrorTopicCreateDTO.getSourceClusterPhyId())); + if (!oldHaClusters.equals(newHaClusters)) { + kafkaZkClient.setOrCreateEntityConfigs("ha-clusters", String.valueOf(mirrorTopicCreateDTO.getSourceClusterPhyId()), newHaClusters); + kafkaZkClient.createConfigChangeNotification("ha-clusters/" + mirrorTopicCreateDTO.getSourceClusterPhyId()); + } + boolean pathExists = kafkaZkClient.pathExists("/brokers/topics/" + mirrorTopicCreateDTO.getTopicName()); + if (pathExists) { + return Result.buildFailure(String.format("目标集群已存在%s,保证数据一致性,请删除后再创建", mirrorTopicCreateDTO.getTopicName())); + } + Properties haTopics = kafkaZkClient.getEntityConfigs("ha-topics", mirrorTopicCreateDTO.getTopicName()); + haTopics.put("didi.ha.remote.cluster", String.valueOf(mirrorTopicCreateDTO.getSourceClusterPhyId())); + haTopics.put("didi.ha.sync.topic.partitions.enabled", "true"); + if (mirrorTopicCreateDTO.getSyncConfig()) { + haTopics.put("didi.ha.sync.topic.configs.enabled", "true"); + } + kafkaZkClient.setOrCreateEntityConfigs("ha-topics", mirrorTopicCreateDTO.getTopicName(), haTopics); + kafkaZkClient.createConfigChangeNotification("ha-topics/" + mirrorTopicCreateDTO.getTopicName()); + haActiveStandbyRelationService.batchReplaceTopicHA(mirrorTopicCreateDTO.getSourceClusterPhyId(), mirrorTopicCreateDTO.getDestClusterPhyId(), Collections.singletonList(mirrorTopicCreateDTO.getTopicName())); + } catch (Exception e) { + logger.error("method=batchCreateMirrorTopic||topicName:{}||errMsg=exception", mirrorTopicCreateDTO.getTopicName(), e); + return Result.buildFailure(e.getMessage()); + } + } + return Result.buildSuc(); + } + + @Override + public Result batchDeleteMirrorTopic(List dtoList) { + for (MirrorTopicDeleteDTO mirrorTopicDeleteDTO : dtoList) { + try { + KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(mirrorTopicDeleteDTO.getDestClusterPhyId()); + Properties haTopics = kafkaZkClient.getEntityConfigs("ha-topics", mirrorTopicDeleteDTO.getTopicName()); + if (haTopics.size() != 0) { + kafkaZkClient.setOrCreateEntityConfigs("ha-topics", mirrorTopicDeleteDTO.getTopicName(), new Properties()); + kafkaZkClient.createConfigChangeNotification("ha-topics/" + mirrorTopicDeleteDTO.getTopicName()); + } + haActiveStandbyRelationService.batchDeleteTopicHA(mirrorTopicDeleteDTO.getSourceClusterPhyId(), mirrorTopicDeleteDTO.getDestClusterPhyId(), Collections.singletonList(mirrorTopicDeleteDTO.getTopicName())); + } catch (Exception e) { + logger.error("method=batchDeleteMirrorTopic||topicName:{}||errMsg=exception", mirrorTopicDeleteDTO.getTopicName(), e); + return Result.buildFailure(e.getMessage()); + } + } + return Result.buildSuc(); + } + + @Override + public Result> getTopicsMirrorInfo(Long clusterPhyId, String topicName) { + List haActiveStandbyRelations = haActiveStandbyRelationService.listByClusterAndType(clusterPhyId, HaResTypeEnum.MIRROR_TOPIC); + List topicMirrorInfoVOList = new ArrayList<>(); + for (HaActiveStandbyRelation activeStandbyRelation : haActiveStandbyRelations) { + if (activeStandbyRelation.getResName().equals(topicName)) { + ClusterPhy standbyClusterPhy = clusterPhyService.getClusterByCluster(activeStandbyRelation.getStandbyClusterPhyId()); + ClusterPhy activeClusterPhy = clusterPhyService.getClusterByCluster(activeStandbyRelation.getActiveClusterPhyId()); + TopicMirrorInfoVO topicMirrorInfoVO = new TopicMirrorInfoVO(); + topicMirrorInfoVO.setSourceClusterId(activeStandbyRelation.getActiveClusterPhyId()); + topicMirrorInfoVO.setDestClusterId(activeStandbyRelation.getStandbyClusterPhyId()); + topicMirrorInfoVO.setTopicName(activeStandbyRelation.getResName()); + topicMirrorInfoVO.setSourceClusterName(activeClusterPhy.getName()); + topicMirrorInfoVO.setDestClusterName(standbyClusterPhy.getName()); + Result> ret = topicMetricService.collectTopicMetricsFromKafka(activeStandbyRelation.getStandbyClusterPhyId(), activeStandbyRelation.getResName(), TOPIC_METRIC_BYTES_IN); + if (ret.hasData()) { + Double value = this.getTopicAggMetric(ret.getData(), TOPIC_METRIC_BYTES_IN); + topicMirrorInfoVO.setReplicationBytesIn(value); + } + + ret = topicMetricService.collectTopicMetricsFromKafka(activeStandbyRelation.getActiveClusterPhyId(), activeStandbyRelation.getResName(), TOPIC_METRIC_BYTES_IN); + if (ret.hasData()) { + Double value = this.getTopicAggMetric(ret.getData(), TOPIC_METRIC_BYTES_IN); + topicMirrorInfoVO.setBytesIn(value); + } + + ret = topicMetricService.collectTopicMetricsFromKafka(activeStandbyRelation.getStandbyClusterPhyId(), activeStandbyRelation.getResName(), TOPIC_METRIC_MIRROR_FETCH_LAG); + if (ret.hasData()) { + Float lag = ret.getData().get(0).getMetric(TOPIC_METRIC_MIRROR_FETCH_LAG); + topicMirrorInfoVO.setLag(lag == null ? 0 : lag.longValue()); + } + + topicMirrorInfoVOList.add(topicMirrorInfoVO); + } + } + return Result.buildSuc(topicMirrorInfoVOList); + } + + private Double getTopicAggMetric(List topicMetricsList, String metricName) { + for (TopicMetrics topicMetrics : topicMetricsList) { + if (topicMetrics.isBBrokerAgg()) { + Float value = topicMetrics.getMetric(metricName); + if (value != null) { + return value.doubleValue(); + } + } + } + return Double.NaN; + } +} \ No newline at end of file diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java index 5c6a1fbf..32ac7ce8 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/cache/DataBaseDataLocalCache.java @@ -29,6 +29,9 @@ public class DataBaseDataLocalCache { @Value(value = "${cache.metadata.health-check-result-size:10000}") private Long healthCheckResultCacheSize; + @Value(value = "${cache.metadata.ha-topic-size:10000}") + private Long haTopicCacheSize; + private static Cache> topicLatestMetricsCache; private static Cache clusterLatestMetricsCache; @@ -36,6 +39,7 @@ public class DataBaseDataLocalCache { private static Cache>> partitionsCache; private static Cache>> healthCheckResultCache; + private static Cache haTopicCache; @PostConstruct private void init() { @@ -58,6 +62,11 @@ public class DataBaseDataLocalCache { .expireAfterWrite(90, TimeUnit.SECONDS) .maximumSize(healthCheckResultCacheSize) .build(); + + haTopicCache = Caffeine.newBuilder() + .expireAfterWrite(90, TimeUnit.SECONDS) + .maximumSize(haTopicCacheSize) + .build(); } public static Map getTopicMetrics(Long clusterPhyId) { @@ -100,6 +109,16 @@ public class DataBaseDataLocalCache { return clusterId * HealthCheckDimensionEnum.MAX_VAL.getDimension() + dimensionCode; } + public static void putHaTopic(Long clusterPhyId, String topicName) { + String key = clusterPhyId + "@" + topicName; + haTopicCache.put(key, true); + } + + public static boolean isHaTopic(Long clusterPhyId, String topicName) { + String key = clusterPhyId + "@" + topicName; + return haTopicCache.getIfPresent(key) != null; + } + /**************************************************** private method ****************************************************/ private DataBaseDataLocalCache() { diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/ha/HaActiveStandbyRelationDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/ha/HaActiveStandbyRelationDAO.java new file mode 100644 index 00000000..9cf182e6 --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/ha/HaActiveStandbyRelationDAO.java @@ -0,0 +1,9 @@ +package com.xiaojukeji.know.streaming.km.persistence.mysql.ha; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.xiaojukeji.know.streaming.km.common.bean.po.ha.HaActiveStandbyRelationPO; +import org.springframework.stereotype.Repository; + +@Repository +public interface HaActiveStandbyRelationDAO extends BaseMapper { +} diff --git a/km-rest/pom.xml b/km-rest/pom.xml index 091e2696..b86151e3 100644 --- a/km-rest/pom.xml +++ b/km-rest/pom.xml @@ -56,6 +56,12 @@ ${project.parent.version} + + com.xiaojukeji.kafka + km-ha + ${project.parent.version} + + org.apache.kafka kafka-clients diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/enterprise/mirror/MirrorTopicController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/enterprise/mirror/MirrorTopicController.java new file mode 100644 index 00000000..15972678 --- /dev/null +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/enterprise/mirror/MirrorTopicController.java @@ -0,0 +1,51 @@ +package com.xiaojukeji.know.streaming.km.rest.api.v3.enterprise.mirror; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicCreateDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.ha.mirror.MirrorTopicDeleteDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.vo.ha.mirror.TopicMirrorInfoVO; +import com.xiaojukeji.know.streaming.km.common.constant.ApiPrefix; +import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.ha.mirror.service.MirrorTopicService; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.validation.annotation.Validated; +import org.springframework.web.bind.annotation.*; + +import java.util.List; + +/** + * @author zengqiao + * @date 22/12/12 + */ +@Api(tags = Constant.SWAGGER_API_TAG_PREFIX + "Mirror-Topic-相关接口(REST)") +@RestController +@RequestMapping(ApiPrefix.API_V3_HA_MIRROR_PREFIX) +public class MirrorTopicController { + + @Autowired + private MirrorTopicService mirrorTopicService; + + @ApiOperation(value = "批量创建Topic镜像", notes = "") + @PostMapping(value = "topics") + @ResponseBody + public Result batchCreateMirrorTopic(@Validated @RequestBody List dtoList) { + return mirrorTopicService.batchCreateMirrorTopic(dtoList); + } + + @ApiOperation(value = "批量删除Topic镜像", notes = "") + @DeleteMapping(value = "topics") + @ResponseBody + public Result batchDeleteMirrorTopic(@Validated @RequestBody List dtoList) { + return mirrorTopicService.batchDeleteMirrorTopic(dtoList); + } + + @ApiOperation(value = "Topic镜像信息", notes = "") + @GetMapping(value = "clusters/{clusterPhyId}/topics/{topicName}/mirror-info") + @ResponseBody + public Result> getTopicsMirrorInfo(@PathVariable Long clusterPhyId, + @PathVariable String topicName) { + return mirrorTopicService.getTopicsMirrorInfo(clusterPhyId, topicName); + } +} diff --git a/pom.xml b/pom.xml index e288d0ce..0ba4d00d 100644 --- a/pom.xml +++ b/pom.xml @@ -60,6 +60,7 @@ km-collector km-rest km-dist + km-enterprise/km-ha