From 235c0ed30e3239191257c48b6242afc152810357 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 16:48:31 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]MM2=E7=AE=A1=E7=90=86-MM2=E7=AE=A1?= =?UTF-8?q?=E7=90=86=E7=9B=B8=E5=85=B3=E5=AE=9E=E4=BD=93=E7=B1=BB(#894)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ClusterMirrorMakersOverviewDTO.java | 12 ++++ .../bean/dto/connect/ClusterConnectorDTO.java | 4 +- .../connect/connector/ConnectorCreateDTO.java | 9 ++- .../connect/mm2/MirrorMaker2ActionDTO.java | 15 ++++ .../connect/mm2/MirrorMaker2DeleteDTO.java | 14 ++++ .../dto/connect/mm2/MirrorMakerCreateDTO.java | 69 +++++++++++++++++++ .../metrices/mm2/MetricsMirrorMakersDTO.java | 23 +++++++ .../bean/entity/connect/ConnectWorker.java | 1 - .../entity/param/connect/ConnectorParam.java | 7 +- .../mm2/ClusterMirrorMakerOverviewVO.java | 52 ++++++++++++++ .../cluster/mm2/MirrorMakerBaseStateVO.java | 25 +++++++ .../vo/cluster/mm2/MirrorMakerBasicVO.java | 16 +++++ .../vo/cluster/mm2/MirrorMakerStateVO.java | 34 +++++++++ .../km/common/constant/MsgConstant.java | 7 ++ .../connect/KafkaConnectConstant.java | 17 +++++ .../km/common/utils/MirrorMakerUtil.java | 11 +++ .../know/streaming/km/common/utils/Tuple.java | 9 ++- 17 files changed, 317 insertions(+), 8 deletions(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterMirrorMakersOverviewDTO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2ActionDTO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2DeleteDTO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/mm2/MetricsMirrorMakersDTO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/ClusterMirrorMakerOverviewVO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBasicVO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerStateVO.java create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/MirrorMakerUtil.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterMirrorMakersOverviewDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterMirrorMakersOverviewDTO.java new file mode 100644 index 00000000..c109ebba --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/cluster/ClusterMirrorMakersOverviewDTO.java @@ -0,0 +1,12 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster; + +import lombok.Data; + + +/** + * @author zengqiao + * @date 22/12/12 + */ +@Data +public class ClusterMirrorMakersOverviewDTO extends ClusterConnectorsOverviewDTO { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/ClusterConnectorDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/ClusterConnectorDTO.java index 71cfcce8..bbc84ccf 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/ClusterConnectorDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/ClusterConnectorDTO.java @@ -19,11 +19,11 @@ import javax.validation.constraints.NotNull; public class ClusterConnectorDTO extends BaseDTO { @NotNull(message = "connectClusterId不允许为空") @ApiModelProperty(value = "Connector集群ID", example = "1") - private Long connectClusterId; + protected Long connectClusterId; @NotBlank(message = "name不允许为空串") @ApiModelProperty(value = "Connector名称", example = "know-streaming-connector") - private String connectorName; + protected String connectorName; public ClusterConnectorDTO(Long connectClusterId, String connectorName) { this.connectClusterId = connectClusterId; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java index a2272118..46639f0e 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/connector/ConnectorCreateDTO.java @@ -4,6 +4,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnector import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; +import lombok.NoArgsConstructor; import javax.validation.constraints.NotNull; import java.util.Properties; @@ -13,9 +14,15 @@ import java.util.Properties; * @date 2022-10-17 */ @Data +@NoArgsConstructor @ApiModel(description = "创建Connector") public class ConnectorCreateDTO extends ClusterConnectorDTO { @NotNull(message = "configs不允许为空") @ApiModelProperty(value = "配置", example = "") - private Properties configs; + protected Properties configs; + + public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties configs) { + super(connectClusterId, connectorName); + this.configs = configs; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2ActionDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2ActionDTO.java new file mode 100644 index 00000000..1d06c6af --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2ActionDTO.java @@ -0,0 +1,15 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO; +import io.swagger.annotations.ApiModel; +import lombok.Data; + + +/** + * @author zengqiao + * @date 2022-12-12 + */ +@Data +@ApiModel(description = "操作MM2") +public class MirrorMaker2ActionDTO extends ConnectorActionDTO { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2DeleteDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2DeleteDTO.java new file mode 100644 index 00000000..9b219cfc --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMaker2DeleteDTO.java @@ -0,0 +1,14 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO; +import io.swagger.annotations.ApiModel; +import lombok.Data; + +/** + * @author zengqiao + * @date 2022-12-12 + */ +@Data +@ApiModel(description = "删除MM2") +public class MirrorMaker2DeleteDTO extends ConnectorDeleteDTO { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java new file mode 100644 index 00000000..fa9867ec --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/connect/mm2/MirrorMakerCreateDTO.java @@ -0,0 +1,69 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO; +import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; +import org.apache.kafka.clients.CommonClientConfigs; + +import javax.validation.Valid; +import javax.validation.constraints.NotNull; +import java.util.Properties; + +/** + * @author zengqiao + * @date 2022-12-12 + */ +@Data +@ApiModel(description = "创建MM2") +public class MirrorMakerCreateDTO extends ConnectorCreateDTO { + @NotNull(message = "sourceKafkaClusterId不允许为空") + @ApiModelProperty(value = "源Kafka集群ID", example = "") + private Long sourceKafkaClusterId; + + @Valid + @ApiModelProperty(value = "heartbeat-connector的信息", example = "") + private Properties heartbeatConnectorConfigs; + + @Valid + @ApiModelProperty(value = "checkpoint-connector的信息", example = "") + private Properties checkpointConnectorConfigs; + + public void unifyData(Long sourceKafkaClusterId, String sourceBootstrapServers, Properties sourceKafkaProps, + Long targetKafkaClusterId, String targetBootstrapServers, Properties targetKafkaProps) { + if (sourceKafkaProps == null) { + sourceKafkaProps = new Properties(); + } + + if (targetKafkaProps == null) { + targetKafkaProps = new Properties(); + } + + this.unifyData(this.configs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps); + + if (heartbeatConnectorConfigs != null) { + this.unifyData(this.heartbeatConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps); + } + + if (checkpointConnectorConfigs != null) { + this.unifyData(this.checkpointConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps); + } + } + + private void unifyData(Properties dataConfig, + Long sourceKafkaClusterId, String sourceBootstrapServers, Properties sourceKafkaProps, + Long targetKafkaClusterId, String targetBootstrapServers, Properties targetKafkaProps) { + dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME, sourceKafkaClusterId); + dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_FIELD_NAME + "." + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, sourceBootstrapServers); + for (Object configKey: sourceKafkaProps.keySet()) { + dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_FIELD_NAME + "." + configKey, sourceKafkaProps.getProperty((String) configKey)); + } + + dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME, targetKafkaClusterId); + dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_FIELD_NAME + "." + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, targetBootstrapServers); + for (Object configKey: targetKafkaProps.keySet()) { + dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_FIELD_NAME + "." + configKey, targetKafkaProps.getProperty((String) configKey)); + } + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/mm2/MetricsMirrorMakersDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/mm2/MetricsMirrorMakersDTO.java new file mode 100644 index 00000000..512832f9 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/metrices/mm2/MetricsMirrorMakersDTO.java @@ -0,0 +1,23 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +import java.util.List; + +/** + * @author didi + */ +@Data +@NoArgsConstructor +@AllArgsConstructor +@ApiModel(description = "MirrorMaker指标查询信息") +public class MetricsMirrorMakersDTO extends MetricDTO { + @ApiModelProperty("MirrorMaker的SourceConnect列表") + private List connectorNameList; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectWorker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectWorker.java index 69a4f747..703a7bc7 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectWorker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectWorker.java @@ -7,7 +7,6 @@ import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; -import java.net.URI; @Data @NoArgsConstructor diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/ConnectorParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/ConnectorParam.java index 0f5b0a75..fb923a0c 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/ConnectorParam.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/connect/ConnectorParam.java @@ -1,7 +1,5 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam; -import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ConnectClusterParam; import lombok.AllArgsConstructor; import lombok.Data; @@ -18,9 +16,12 @@ public class ConnectorParam extends ConnectClusterParam { private String connectorName; - public ConnectorParam(Long connectClusterId, String connectorName) { + private String connectorType; + + public ConnectorParam(Long connectClusterId, String connectorName, String connectorType) { super(connectClusterId); this.connectorName = connectorName; + this.connectorType = connectorType; } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/ClusterMirrorMakerOverviewVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/ClusterMirrorMakerOverviewVO.java new file mode 100644 index 00000000..27a3bd94 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/ClusterMirrorMakerOverviewVO.java @@ -0,0 +1,52 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics; +import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import java.util.List; + +/** + * 集群MM2信息 + * @author zengqiao + * @date 22/02/23 + */ +@Data +@ApiModel(description = "MM2概览信息") +public class ClusterMirrorMakerOverviewVO extends MirrorMakerBasicVO { + @ApiModelProperty(value = "源Kafka集群Id", example = "1") + private Long sourceKafkaClusterId; + + @ApiModelProperty(value = "源Kafka集群名称", example = "aaa") + private String sourceKafkaClusterName; + + @ApiModelProperty(value = "目标Kafka集群Id", example = "1") + private Long destKafkaClusterId; + + @ApiModelProperty(value = "目标Kafka集群名称", example = "aaa") + private String destKafkaClusterName; + + /** + * @see org.apache.kafka.connect.runtime.AbstractStatus.State + */ + @ApiModelProperty(value = "状态", example = "RUNNING") + private String state; + + @ApiModelProperty(value = "Task数", example = "100") + private Integer taskCount; + + @ApiModelProperty(value = "心跳检测connector", example = "heartbeatConnector") + private String heartbeatConnector; + + @ApiModelProperty(value = "进度确认connector", example = "checkpointConnector") + private String checkpointConnector; + + + @ApiModelProperty(value = "多个指标的当前值, 包括健康分/LogSize等") + private BaseMetrics latestMetrics; + + @ApiModelProperty(value = "多个指标的历史曲线值,包括LogSize/BytesIn等") + private List metricLines; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java new file mode 100644 index 00000000..04aed035 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBaseStateVO.java @@ -0,0 +1,25 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.vo.BaseVO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * 集群MM2状态信息 + * @author fengqiongfeng + * @date 22/12/29 + */ +@Data +@ApiModel(description = "集群MM2状态信息") +public class MirrorMakerBaseStateVO extends BaseVO { + + @ApiModelProperty(value = "worker数", example = "1") + private Integer workerCount; + + @ApiModelProperty(value = "总Task数", example = "1") + private Integer totalTaskCount; + + @ApiModelProperty(value = "存活Task数", example = "1") + private Integer aliveTaskCount; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBasicVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBasicVO.java new file mode 100644 index 00000000..177d76eb --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerBasicVO.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO; +import io.swagger.annotations.ApiModel; +import lombok.Data; + + +/** + * 集群MM2信息 + * @author zengqiao + * @date 22/02/23 + */ +@Data +@ApiModel(description = "MM2基本信息") +public class MirrorMakerBasicVO extends ConnectorBasicVO { +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerStateVO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerStateVO.java new file mode 100644 index 00000000..e0a1782d --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/vo/cluster/mm2/MirrorMakerStateVO.java @@ -0,0 +1,34 @@ +package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2; + +import com.xiaojukeji.know.streaming.km.common.bean.vo.BaseVO; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +/** + * 集群MM2状态信息 + * @author zengqiao + * @date 22/12/12 + */ +@Data +@ApiModel(description = "集群MM2状态信息") +public class MirrorMakerStateVO extends BaseVO { + + @ApiModelProperty(value = "MM2数", example = "1") + private Integer mirrorMakerCount; + + @ApiModelProperty(value = "worker数", example = "1") + private Integer workerCount; + + @ApiModelProperty(value = "总Connector数", example = "1") + private Integer totalConnectorCount; + + @ApiModelProperty(value = "存活Connector数", example = "1") + private Integer aliveConnectorCount; + + @ApiModelProperty(value = "总Task数", example = "1") + private Integer totalTaskCount; + + @ApiModelProperty(value = "存活Task数", example = "1") + private Integer aliveTaskCount; +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java index 768ebddf..2f510a84 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java @@ -110,4 +110,11 @@ public class MsgConstant { public static String getConnectorBizStr(Long clusterPhyId, String topicName) { return String.format("Connect集群ID:[%d] Connector名称:[%s]", clusterPhyId, topicName); } + + + /**************************************************** Connector ****************************************************/ + + public static String getConnectorNotExist(Long connectClusterId, String connectorName) { + return String.format("Connect集群ID:[%d] Connector名称:[%s] 不存在", connectClusterId, connectorName); + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/connect/KafkaConnectConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/connect/KafkaConnectConstant.java index 5746bfd9..63612874 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/connect/KafkaConnectConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/connect/KafkaConnectConstant.java @@ -10,6 +10,23 @@ public class KafkaConnectConstant { public static final String CONNECTOR_TOPICS_FILED_NAME = "topics"; public static final String CONNECTOR_TOPICS_FILED_ERROR_VALUE = "know-streaming-connect-illegal-value"; + public static final String MIRROR_MAKER_TOPIC_PARTITION_PATTERN = "kafka.connect.mirror:type=MirrorSourceConnector,target=*,topic=*,partition=*"; + + public static final String MIRROR_MAKER_SOURCE_CONNECTOR_TYPE = "org.apache.kafka.connect.mirror.MirrorSourceConnector"; + + public static final String MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE = "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector"; + + public static final String MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE = "org.apache.kafka.connect.mirror.MirrorCheckpointConnector"; + + public static final String MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME = "target.cluster.bootstrap.servers"; + public static final String MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME = "target.cluster.alias"; + public static final String MIRROR_MAKER_TARGET_CLUSTER_FIELD_NAME = "target.cluster"; + + public static final String MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME = "source.cluster.bootstrap.servers"; + public static final String MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME = "source.cluster.alias"; + public static final String MIRROR_MAKER_SOURCE_CLUSTER_FIELD_NAME = "source.cluster"; + + public static final String MIRROR_MAKER_NAME_FIELD_NAME = "name"; private KafkaConnectConstant() { } } \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/MirrorMakerUtil.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/MirrorMakerUtil.java new file mode 100644 index 00000000..d8145832 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/MirrorMakerUtil.java @@ -0,0 +1,11 @@ +package com.xiaojukeji.know.streaming.km.common.utils; + +public class MirrorMakerUtil { + public static String genCheckpointName(String sourceName) { + return sourceName == null? "-checkpoint": sourceName + "-checkpoint"; + } + + public static String genHeartbeatName(String sourceName) { + return sourceName == null? "-heartbeat": sourceName + "-heartbeat"; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/Tuple.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/Tuple.java index 405845a1..c4984604 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/Tuple.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/utils/Tuple.java @@ -12,7 +12,6 @@ import lombok.Data; @JsonIgnoreProperties(value = { "hibernateLazyInitializer", "handler" }) @Data public class Tuple { - private T v1; private V v2; @@ -58,4 +57,12 @@ public class Tuple { result = 31 * result + (v2 != null ? v2.hashCode() : 0); return result; } + + @Override + public String toString() { + return "Tuple{" + + "v1=" + v1 + + ", v2=" + v2 + + '}'; + } }