[Feature]MM2管理-MM2管理相关实体类(#894)

This commit is contained in:
zengqiao
2023-02-09 16:48:31 +08:00
committed by EricZeng
parent 5bd93aa478
commit 235c0ed30e
17 changed files with 317 additions and 8 deletions

View File

@@ -0,0 +1,12 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.cluster;
import lombok.Data;
/**
* @author zengqiao
* @date 22/12/12
*/
@Data
public class ClusterMirrorMakersOverviewDTO extends ClusterConnectorsOverviewDTO {
}

View File

@@ -19,11 +19,11 @@ import javax.validation.constraints.NotNull;
public class ClusterConnectorDTO extends BaseDTO { public class ClusterConnectorDTO extends BaseDTO {
@NotNull(message = "connectClusterId不允许为空") @NotNull(message = "connectClusterId不允许为空")
@ApiModelProperty(value = "Connector集群ID", example = "1") @ApiModelProperty(value = "Connector集群ID", example = "1")
private Long connectClusterId; protected Long connectClusterId;
@NotBlank(message = "name不允许为空串") @NotBlank(message = "name不允许为空串")
@ApiModelProperty(value = "Connector名称", example = "know-streaming-connector") @ApiModelProperty(value = "Connector名称", example = "know-streaming-connector")
private String connectorName; protected String connectorName;
public ClusterConnectorDTO(Long connectClusterId, String connectorName) { public ClusterConnectorDTO(Long connectClusterId, String connectorName) {
this.connectClusterId = connectClusterId; this.connectClusterId = connectClusterId;

View File

@@ -4,6 +4,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnector
import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty; import io.swagger.annotations.ApiModelProperty;
import lombok.Data; import lombok.Data;
import lombok.NoArgsConstructor;
import javax.validation.constraints.NotNull; import javax.validation.constraints.NotNull;
import java.util.Properties; import java.util.Properties;
@@ -13,9 +14,15 @@ import java.util.Properties;
* @date 2022-10-17 * @date 2022-10-17
*/ */
@Data @Data
@NoArgsConstructor
@ApiModel(description = "创建Connector") @ApiModel(description = "创建Connector")
public class ConnectorCreateDTO extends ClusterConnectorDTO { public class ConnectorCreateDTO extends ClusterConnectorDTO {
@NotNull(message = "configs不允许为空") @NotNull(message = "configs不允许为空")
@ApiModelProperty(value = "配置", example = "") @ApiModelProperty(value = "配置", example = "")
private Properties configs; protected Properties configs;
public ConnectorCreateDTO(Long connectClusterId, String connectorName, Properties configs) {
super(connectClusterId, connectorName);
this.configs = configs;
}
} }

View File

@@ -0,0 +1,15 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorActionDTO;
import io.swagger.annotations.ApiModel;
import lombok.Data;
/**
* @author zengqiao
* @date 2022-12-12
*/
@Data
@ApiModel(description = "操作MM2")
public class MirrorMaker2ActionDTO extends ConnectorActionDTO {
}

View File

@@ -0,0 +1,14 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorDeleteDTO;
import io.swagger.annotations.ApiModel;
import lombok.Data;
/**
* @author zengqiao
* @date 2022-12-12
*/
@Data
@ApiModel(description = "删除MM2")
public class MirrorMaker2DeleteDTO extends ConnectorDeleteDTO {
}

View File

@@ -0,0 +1,69 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.connect.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.connector.ConnectorCreateDTO;
import com.xiaojukeji.know.streaming.km.common.constant.connect.KafkaConnectConstant;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import org.apache.kafka.clients.CommonClientConfigs;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.util.Properties;
/**
* @author zengqiao
* @date 2022-12-12
*/
@Data
@ApiModel(description = "创建MM2")
public class MirrorMakerCreateDTO extends ConnectorCreateDTO {
@NotNull(message = "sourceKafkaClusterId不允许为空")
@ApiModelProperty(value = "源Kafka集群ID", example = "")
private Long sourceKafkaClusterId;
@Valid
@ApiModelProperty(value = "heartbeat-connector的信息", example = "")
private Properties heartbeatConnectorConfigs;
@Valid
@ApiModelProperty(value = "checkpoint-connector的信息", example = "")
private Properties checkpointConnectorConfigs;
public void unifyData(Long sourceKafkaClusterId, String sourceBootstrapServers, Properties sourceKafkaProps,
Long targetKafkaClusterId, String targetBootstrapServers, Properties targetKafkaProps) {
if (sourceKafkaProps == null) {
sourceKafkaProps = new Properties();
}
if (targetKafkaProps == null) {
targetKafkaProps = new Properties();
}
this.unifyData(this.configs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
if (heartbeatConnectorConfigs != null) {
this.unifyData(this.heartbeatConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
}
if (checkpointConnectorConfigs != null) {
this.unifyData(this.checkpointConnectorConfigs, sourceKafkaClusterId, sourceBootstrapServers, sourceKafkaProps, targetKafkaClusterId, targetBootstrapServers, targetKafkaProps);
}
}
private void unifyData(Properties dataConfig,
Long sourceKafkaClusterId, String sourceBootstrapServers, Properties sourceKafkaProps,
Long targetKafkaClusterId, String targetBootstrapServers, Properties targetKafkaProps) {
dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME, sourceKafkaClusterId);
dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_FIELD_NAME + "." + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, sourceBootstrapServers);
for (Object configKey: sourceKafkaProps.keySet()) {
dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_SOURCE_CLUSTER_FIELD_NAME + "." + configKey, sourceKafkaProps.getProperty((String) configKey));
}
dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME, targetKafkaClusterId);
dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_FIELD_NAME + "." + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, targetBootstrapServers);
for (Object configKey: targetKafkaProps.keySet()) {
dataConfig.put(KafkaConnectConstant.MIRROR_MAKER_TARGET_CLUSTER_FIELD_NAME + "." + configKey, targetKafkaProps.getProperty((String) configKey));
}
}
}

View File

@@ -0,0 +1,23 @@
package com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.ClusterConnectorDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.metrices.MetricDTO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* @author didi
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ApiModel(description = "MirrorMaker指标查询信息")
public class MetricsMirrorMakersDTO extends MetricDTO {
@ApiModelProperty("MirrorMaker的SourceConnect列表")
private List<ClusterConnectorDTO> connectorNameList;
}

View File

@@ -7,7 +7,6 @@ import lombok.Data;
import lombok.NoArgsConstructor; import lombok.NoArgsConstructor;
import java.io.Serializable; import java.io.Serializable;
import java.net.URI;
@Data @Data
@NoArgsConstructor @NoArgsConstructor

View File

@@ -1,7 +1,5 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect; package com.xiaojukeji.know.streaming.km.common.bean.entity.param.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ConnectClusterParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ConnectClusterParam;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Data; import lombok.Data;
@@ -18,9 +16,12 @@ public class ConnectorParam extends ConnectClusterParam {
private String connectorName; private String connectorName;
public ConnectorParam(Long connectClusterId, String connectorName) { private String connectorType;
public ConnectorParam(Long connectClusterId, String connectorName, String connectorType) {
super(connectClusterId); super(connectClusterId);
this.connectorName = connectorName; this.connectorName = connectorName;
this.connectorType = connectorType;
} }
} }

View File

@@ -0,0 +1,52 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.BaseMetrics;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.line.MetricLineVO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import java.util.List;
/**
* 集群MM2信息
* @author zengqiao
* @date 22/02/23
*/
@Data
@ApiModel(description = "MM2概览信息")
public class ClusterMirrorMakerOverviewVO extends MirrorMakerBasicVO {
@ApiModelProperty(value = "源Kafka集群Id", example = "1")
private Long sourceKafkaClusterId;
@ApiModelProperty(value = "源Kafka集群名称", example = "aaa")
private String sourceKafkaClusterName;
@ApiModelProperty(value = "目标Kafka集群Id", example = "1")
private Long destKafkaClusterId;
@ApiModelProperty(value = "目标Kafka集群名称", example = "aaa")
private String destKafkaClusterName;
/**
* @see org.apache.kafka.connect.runtime.AbstractStatus.State
*/
@ApiModelProperty(value = "状态", example = "RUNNING")
private String state;
@ApiModelProperty(value = "Task数", example = "100")
private Integer taskCount;
@ApiModelProperty(value = "心跳检测connector", example = "heartbeatConnector")
private String heartbeatConnector;
@ApiModelProperty(value = "进度确认connector", example = "checkpointConnector")
private String checkpointConnector;
@ApiModelProperty(value = "多个指标的当前值, 包括健康分/LogSize等")
private BaseMetrics latestMetrics;
@ApiModelProperty(value = "多个指标的历史曲线值包括LogSize/BytesIn等")
private List<MetricLineVO> metricLines;
}

View File

@@ -0,0 +1,25 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.vo.BaseVO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* 集群MM2状态信息
* @author fengqiongfeng
* @date 22/12/29
*/
@Data
@ApiModel(description = "集群MM2状态信息")
public class MirrorMakerBaseStateVO extends BaseVO {
@ApiModelProperty(value = "worker数", example = "1")
private Integer workerCount;
@ApiModelProperty(value = "总Task数", example = "1")
private Integer totalTaskCount;
@ApiModelProperty(value = "存活Task数", example = "1")
private Integer aliveTaskCount;
}

View File

@@ -0,0 +1,16 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.connector.ConnectorBasicVO;
import io.swagger.annotations.ApiModel;
import lombok.Data;
/**
* 集群MM2信息
* @author zengqiao
* @date 22/02/23
*/
@Data
@ApiModel(description = "MM2基本信息")
public class MirrorMakerBasicVO extends ConnectorBasicVO {
}

View File

@@ -0,0 +1,34 @@
package com.xiaojukeji.know.streaming.km.common.bean.vo.cluster.mm2;
import com.xiaojukeji.know.streaming.km.common.bean.vo.BaseVO;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* 集群MM2状态信息
* @author zengqiao
* @date 22/12/12
*/
@Data
@ApiModel(description = "集群MM2状态信息")
public class MirrorMakerStateVO extends BaseVO {
@ApiModelProperty(value = "MM2数", example = "1")
private Integer mirrorMakerCount;
@ApiModelProperty(value = "worker数", example = "1")
private Integer workerCount;
@ApiModelProperty(value = "总Connector数", example = "1")
private Integer totalConnectorCount;
@ApiModelProperty(value = "存活Connector数", example = "1")
private Integer aliveConnectorCount;
@ApiModelProperty(value = "总Task数", example = "1")
private Integer totalTaskCount;
@ApiModelProperty(value = "存活Task数", example = "1")
private Integer aliveTaskCount;
}

View File

@@ -110,4 +110,11 @@ public class MsgConstant {
public static String getConnectorBizStr(Long clusterPhyId, String topicName) { public static String getConnectorBizStr(Long clusterPhyId, String topicName) {
return String.format("Connect集群ID:[%d] Connector名称:[%s]", clusterPhyId, topicName); return String.format("Connect集群ID:[%d] Connector名称:[%s]", clusterPhyId, topicName);
} }
/**************************************************** Connector ****************************************************/
public static String getConnectorNotExist(Long connectClusterId, String connectorName) {
return String.format("Connect集群ID:[%d] Connector名称:[%s] 不存在", connectClusterId, connectorName);
}
} }

View File

@@ -10,6 +10,23 @@ public class KafkaConnectConstant {
public static final String CONNECTOR_TOPICS_FILED_NAME = "topics"; public static final String CONNECTOR_TOPICS_FILED_NAME = "topics";
public static final String CONNECTOR_TOPICS_FILED_ERROR_VALUE = "know-streaming-connect-illegal-value"; public static final String CONNECTOR_TOPICS_FILED_ERROR_VALUE = "know-streaming-connect-illegal-value";
public static final String MIRROR_MAKER_TOPIC_PARTITION_PATTERN = "kafka.connect.mirror:type=MirrorSourceConnector,target=*,topic=*,partition=*";
public static final String MIRROR_MAKER_SOURCE_CONNECTOR_TYPE = "org.apache.kafka.connect.mirror.MirrorSourceConnector";
public static final String MIRROR_MAKER_HEARTBEAT_CONNECTOR_TYPE = "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector";
public static final String MIRROR_MAKER_CHECKPOINT_CONNECTOR_TYPE = "org.apache.kafka.connect.mirror.MirrorCheckpointConnector";
public static final String MIRROR_MAKER_TARGET_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME = "target.cluster.bootstrap.servers";
public static final String MIRROR_MAKER_TARGET_CLUSTER_ALIAS_FIELD_NAME = "target.cluster.alias";
public static final String MIRROR_MAKER_TARGET_CLUSTER_FIELD_NAME = "target.cluster";
public static final String MIRROR_MAKER_SOURCE_CLUSTER_BOOTSTRAP_SERVERS_FIELD_NAME = "source.cluster.bootstrap.servers";
public static final String MIRROR_MAKER_SOURCE_CLUSTER_ALIAS_FIELD_NAME = "source.cluster.alias";
public static final String MIRROR_MAKER_SOURCE_CLUSTER_FIELD_NAME = "source.cluster";
public static final String MIRROR_MAKER_NAME_FIELD_NAME = "name";
private KafkaConnectConstant() { private KafkaConnectConstant() {
} }
} }

View File

@@ -0,0 +1,11 @@
package com.xiaojukeji.know.streaming.km.common.utils;
public class MirrorMakerUtil {
public static String genCheckpointName(String sourceName) {
return sourceName == null? "-checkpoint": sourceName + "-checkpoint";
}
public static String genHeartbeatName(String sourceName) {
return sourceName == null? "-heartbeat": sourceName + "-heartbeat";
}
}

View File

@@ -12,7 +12,6 @@ import lombok.Data;
@JsonIgnoreProperties(value = { "hibernateLazyInitializer", "handler" }) @JsonIgnoreProperties(value = { "hibernateLazyInitializer", "handler" })
@Data @Data
public class Tuple<T, V> { public class Tuple<T, V> {
private T v1; private T v1;
private V v2; private V v2;
@@ -58,4 +57,12 @@ public class Tuple<T, V> {
result = 31 * result + (v2 != null ? v2.hashCode() : 0); result = 31 * result + (v2 != null ? v2.hashCode() : 0);
return result; return result;
} }
@Override
public String toString() {
return "Tuple{" +
"v1=" + v1 +
", v2=" + v2 +
'}';
}
} }