diff --git a/distribution/conf/create_mysql_table.sql b/distribution/conf/create_mysql_table.sql index f859d752..8da67ca4 100644 --- a/distribution/conf/create_mysql_table.sql +++ b/distribution/conf/create_mysql_table.sql @@ -591,4 +591,7 @@ CREATE TABLE `work_order` ( `gmt_create` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', `gmt_modify` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间', PRIMARY KEY (`id`) -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='工单表'; \ No newline at end of file +) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='工单表'; + + +ALTER TABLE `topic_connections` ADD COLUMN `client_id` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '客户端ID' AFTER `client_version`; diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ha/HaResTypeEnum.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ha/HaResTypeEnum.java index 409758c2..75ee8fe0 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ha/HaResTypeEnum.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/bizenum/ha/HaResTypeEnum.java @@ -9,9 +9,13 @@ import lombok.Getter; @Getter public enum HaResTypeEnum { CLUSTER(0, "Cluster"), + TOPIC(1, "Topic"), + KAFKA_USER(2, "KafkaUser"), + KAFKA_USER_AND_CLIENT(3, "KafkaUserAndClient"), + ; private final int code; @@ -22,4 +26,4 @@ public enum HaResTypeEnum { this.code = code; this.msg = msg; } -} \ No newline at end of file +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java index 17f20223..e1ec58d0 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/constant/ConfigConstant.java @@ -33,6 +33,8 @@ public class ConfigConstant { public static final String HA_SWITCH_JOB_TIMEOUT_UNIT_SEC_CONFIG_PREFIX = "HA_SWITCH_JOB_TIMEOUT_UNIT_SEC_CONFIG_CLUSTER"; + public static final String HA_CONNECTION_ACTIVE_TIME_UNIT_MIN = "HA_CONNECTION_ACTIVE_TIME_UNIT_MIN"; + private ConfigConstant() { } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicConnection.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicConnection.java index abb40327..73492f78 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicConnection.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/ao/topic/TopicConnection.java @@ -1,9 +1,12 @@ package com.xiaojukeji.kafka.manager.common.entity.ao.topic; +import lombok.Data; + /** * @author zengqiao * @date 20/4/20 */ +@Data public class TopicConnection { private Long clusterId; @@ -19,72 +22,9 @@ public class TopicConnection { private String clientVersion; - public Long getClusterId() { - return clusterId; - } + private String clientId; - public void setClusterId(Long clusterId) { - this.clusterId = clusterId; - } + private Long realConnectTime; - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getHostname() { - return hostname; - } - - public void setHostname(String hostname) { - this.hostname = hostname; - } - - public String getClientType() { - return clientType; - } - - public void setClientType(String clientType) { - this.clientType = clientType; - } - - public String getClientVersion() { - return clientVersion; - } - - public void setClientVersion(String clientVersion) { - this.clientVersion = clientVersion; - } - - @Override - public String toString() { - return "TopicConnectionDTO{" + - "clusterId=" + clusterId + - ", topicName='" + topicName + '\'' + - ", appId='" + appId + '\'' + - ", ip='" + ip + '\'' + - ", hostname='" + hostname + '\'' + - ", clientType='" + clientType + '\'' + - ", clientVersion='" + clientVersion + '\'' + - '}'; - } + private Long createTime; } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobActionDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobActionDTO.java index 1f1d41c6..acb69967 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobActionDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobActionDTO.java @@ -15,12 +15,4 @@ public class ASSwitchJobActionDTO { @NotBlank(message = "action不允许为空") @ApiModelProperty(value = "动作, force") private String action; - -// @NotNull(message = "all不允许为NULL") -// @ApiModelProperty(value = "所有的Topic") -// private Boolean allJumpWaitInSync; -// -// @NotNull(message = "jumpWaitInSyncActiveTopicList不允许为NULL") -// @ApiModelProperty(value = "操作的Topic") -// private List jumpWaitInSyncActiveTopicList; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobDTO.java index 8c4ae0dc..5a62ad0a 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/ASSwitchJobDTO.java @@ -4,6 +4,7 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; +import javax.validation.Valid; import javax.validation.constraints.NotNull; import java.util.List; @@ -27,5 +28,13 @@ public class ASSwitchJobDTO { private Long standbyClusterPhyId; @NotNull(message = "topicNameList不允许为NULL") + @ApiModelProperty(value="切换的Topic名称列表") private List topicNameList; + + /** + * kafkaUser+Client列表 + */ + @Valid + @ApiModelProperty(value="切换的KafkaUser&ClientId列表,Client可以为空串") + private List kafkaUserAndClientIdList; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/KafkaUserAndClientDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/KafkaUserAndClientDTO.java new file mode 100644 index 00000000..ec82cf00 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/ha/KafkaUserAndClientDTO.java @@ -0,0 +1,18 @@ +package com.xiaojukeji.kafka.manager.common.entity.dto.ha; + +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.NotBlank; + +@Data +@ApiModel(description="KafkaUser和ClientId信息") +public class KafkaUserAndClientDTO { + @NotBlank(message = "kafkaUser不允许为空串") + @ApiModelProperty(value = "kafkaUser") + private String kafkaUser; + + @ApiModelProperty(value = "clientId") + private String clientId; +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/HaTopicRelationDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/HaTopicRelationDTO.java index d6aea1e5..16ebe7a9 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/HaTopicRelationDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/op/topic/HaTopicRelationDTO.java @@ -32,6 +32,9 @@ public class HaTopicRelationDTO { @ApiModelProperty(value = "需要关联|解绑的topic名称列表") private List topicNames; + @ApiModelProperty(value = "解绑是否保留备集群资源(topic,kafkaUser,group)") + private Boolean retainStandbyResource; + @Override public String toString() { return "HaTopicRelationDTO{" + @@ -39,6 +42,7 @@ public class HaTopicRelationDTO { ", standbyClusterId=" + standbyClusterId + ", all=" + all + ", topicNames=" + topicNames + + ", retainStandbyResource=" + retainStandbyResource + '}'; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/AppRelateTopicsDTO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/AppRelateTopicsDTO.java index bc49f136..f7ba6bc8 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/AppRelateTopicsDTO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/dto/rd/AppRelateTopicsDTO.java @@ -21,4 +21,11 @@ public class AppRelateTopicsDTO { @NotNull(message = "filterTopicNameList不允许为NULL") @ApiModelProperty(value="过滤的Topic列表") private List filterTopicNameList; + + @ApiModelProperty(value="使用KafkaUser+Client维度的数据,默认是kafkaUser维度") + private Boolean useKafkaUserAndClientId; + + @NotNull(message = "ha不允许为NULL") + @ApiModelProperty(value="查询是否高可用topic") + private Boolean ha; } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/TopicConnectionDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/TopicConnectionDO.java index 4707d271..fe192af9 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/TopicConnectionDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/gateway/TopicConnectionDO.java @@ -1,5 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.pojo.gateway; +import lombok.Data; + import java.util.Date; /** @@ -7,6 +9,7 @@ import java.util.Date; * @author zengqiao * @date 20/7/6 */ +@Data public class TopicConnectionDO { private Long id; @@ -22,87 +25,13 @@ public class TopicConnectionDO { private String clientVersion; + private String clientId; + + private Long realConnectTime; + private Date createTime; - public Long getId() { - return id; - } - - public void setId(Long id) { - this.id = id; - } - - public Long getClusterId() { - return clusterId; - } - - public void setClusterId(Long clusterId) { - this.clusterId = clusterId; - } - - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getType() { - return type; - } - - public void setType(String type) { - this.type = type; - } - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getClientVersion() { - return clientVersion; - } - - public void setClientVersion(String clientVersion) { - this.clientVersion = clientVersion; - } - - public Date getCreateTime() { - return createTime; - } - - public void setCreateTime(Date createTime) { - this.createTime = createTime; - } - - @Override - public String toString() { - return "TopicConnectionDO{" + - "id=" + id + - ", clusterId=" + clusterId + - ", topicName='" + topicName + '\'' + - ", type='" + type + '\'' + - ", appId='" + appId + '\'' + - ", ip='" + ip + '\'' + - ", clientVersion='" + clientVersion + '\'' + - ", createTime=" + createTime + - '}'; - } - public String uniqueKey() { - return appId + clusterId + topicName + type + ip; + return appId + clusterId + topicName + type + ip + clientId; } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASRelationDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASRelationDO.java index a55d5d00..84c78bdc 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASRelationDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASRelationDO.java @@ -1,6 +1,7 @@ package com.xiaojukeji.kafka.manager.common.entity.pojo.ha; import com.baomidou.mybatisplus.annotation.TableName; +import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum; import com.xiaojukeji.kafka.manager.common.entity.pojo.BaseDO; import lombok.AllArgsConstructor; import lombok.Data; @@ -37,6 +38,7 @@ public class HaASRelationDO extends BaseDO { /** * 资源类型 + * @see HaResTypeEnum */ private Integer resType; diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASSwitchJobDO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASSwitchJobDO.java index d68c4f88..1337ccb7 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASSwitchJobDO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/pojo/ha/HaASSwitchJobDO.java @@ -1,10 +1,16 @@ package com.xiaojukeji.kafka.manager.common.entity.pojo.ha; import com.baomidou.mybatisplus.annotation.TableName; +import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.BaseDO; +import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil; +import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import lombok.Data; import lombok.NoArgsConstructor; +import java.util.ArrayList; +import java.util.List; + /** * HA-主备关系切换任务表 @@ -28,15 +34,35 @@ public class HaASSwitchJobDO extends BaseDO { */ private Integer jobStatus; + /** + * 类型,0:kafkaUser 1:kafkaUser+Client + */ + private Integer type; + + /** + * 扩展数据 + */ + private String extendData; + /** * 操作人 */ private String operator; - public HaASSwitchJobDO(Long activeClusterPhyId, Long standbyClusterPhyId, Integer jobStatus, String operator) { + public HaASSwitchJobDO(Long activeClusterPhyId, Long standbyClusterPhyId, Integer type, List extendDataObj, Integer jobStatus, String operator) { this.activeClusterPhyId = activeClusterPhyId; this.standbyClusterPhyId = standbyClusterPhyId; + this.type = type; + this.extendData = ValidateUtils.isEmptyList(extendDataObj)? "": ConvertUtil.obj2Json(extendDataObj); this.jobStatus = jobStatus; this.operator = operator; } + + public List getExtendRawData() { + if (ValidateUtils.isBlank(extendData)) { + return new ArrayList<>(); + } + + return ConvertUtil.str2ObjArrayByJson(extendData, KafkaUserAndClientDTO.class); + } } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicConnectionVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicConnectionVO.java index e6d65b91..58686a75 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicConnectionVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/normal/topic/TopicConnectionVO.java @@ -2,11 +2,13 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; +import lombok.Data; /** * @author zhongyuankai,zengqiao * @date 20/4/8 */ +@Data @ApiModel(value = "Topic连接信息") public class TopicConnectionVO { @ApiModelProperty(value = "集群ID") @@ -30,72 +32,12 @@ public class TopicConnectionVO { @ApiModelProperty(value = "客户端版本") private String clientVersion; - public Long getClusterId() { - return clusterId; - } + @ApiModelProperty(value = "客户端ID") + private String clientId; - public void setClusterId(Long clusterId) { - this.clusterId = clusterId; - } + @ApiModelProperty(value = "连接Broker时间") + private Long realConnectTime; - public String getTopicName() { - return topicName; - } - - public void setTopicName(String topicName) { - this.topicName = topicName; - } - - public String getAppId() { - return appId; - } - - public void setAppId(String appId) { - this.appId = appId; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getHostname() { - return hostname; - } - - public void setHostname(String hostname) { - this.hostname = hostname; - } - - public String getClientType() { - return clientType; - } - - public void setClientType(String clientType) { - this.clientType = clientType; - } - - public String getClientVersion() { - return clientVersion; - } - - public void setClientVersion(String clientVersion) { - this.clientVersion = clientVersion; - } - - @Override - public String toString() { - return "TopicConnectionVO{" + - "clusterId=" + clusterId + - ", topicName='" + topicName + '\'' + - ", appId='" + appId + '\'' + - ", ip='" + ip + '\'' + - ", hostname='" + hostname + '\'' + - ", clientType='" + clientType + '\'' + - ", clientVersion='" + clientVersion + '\'' + - '}'; - } + @ApiModelProperty(value = "创建时间") + private Long createTime; } diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/app/AppRelateTopicsVO.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/app/AppRelateTopicsVO.java index 1ebe57a4..19c41ed1 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/app/AppRelateTopicsVO.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/entity/vo/rd/app/AppRelateTopicsVO.java @@ -3,7 +3,9 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.rd.app; import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import lombok.Data; +import lombok.NoArgsConstructor; +import java.util.ArrayList; import java.util.List; /** @@ -11,6 +13,7 @@ import java.util.List; * @date 20/5/4 */ @Data +@NoArgsConstructor @ApiModel(description="App关联Topic信息") public class AppRelateTopicsVO { @ApiModelProperty(value="物理集群ID") @@ -19,6 +22,12 @@ public class AppRelateTopicsVO { @ApiModelProperty(value="kafkaUser") private String kafkaUser; + @ApiModelProperty(value="clientId") + private String clientId; + + @ApiModelProperty(value="已建立HA的Client") + private List haClientIdList; + @ApiModelProperty(value="选中的Topic列表") private List selectedTopicNameList; @@ -27,4 +36,37 @@ public class AppRelateTopicsVO { @ApiModelProperty(value="未建立HA的Topic列表") private List notHaTopicNameList; + + public AppRelateTopicsVO(Long clusterPhyId, String kafkaUser, String clientId) { + this.clusterPhyId = clusterPhyId; + this.kafkaUser = kafkaUser; + this.clientId = clientId; + this.selectedTopicNameList = new ArrayList<>(); + this.notSelectTopicNameList = new ArrayList<>(); + this.notHaTopicNameList = new ArrayList<>(); + } + + public void addSelectedIfNotExist(String topicName) { + if (selectedTopicNameList.contains(topicName)) { + return; + } + + selectedTopicNameList.add(topicName); + } + + public void addNotSelectedIfNotExist(String topicName) { + if (notSelectTopicNameList.contains(topicName)) { + return; + } + + notSelectTopicNameList.add(topicName); + } + + public void addNotHaIfNotExist(String topicName) { + if (notHaTopicNameList.contains(topicName)) { + return; + } + + notHaTopicNameList.add(topicName); + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HAUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HAUtils.java new file mode 100644 index 00000000..d8ee744f --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/HAUtils.java @@ -0,0 +1,29 @@ +package com.xiaojukeji.kafka.manager.common.utils; + +public class HAUtils { + public static String mergeKafkaUserAndClient(String kafkaUser, String clientId) { + if (ValidateUtils.isBlank(clientId)) { + return kafkaUser; + } + + return String.format("%s#%s", kafkaUser, clientId); + } + + public static Tuple splitKafkaUserAndClient(String kafkaUserAndClientId) { + if (ValidateUtils.isBlank(kafkaUserAndClientId)) { + return null; + } + + int idx = kafkaUserAndClientId.indexOf('#'); + if (idx == -1) { + return null; + } else if (idx == kafkaUserAndClientId.length() - 1) { + return new Tuple<>(kafkaUserAndClientId.substring(0, idx), ""); + } + + return new Tuple<>(kafkaUserAndClientId.substring(0, idx), kafkaUserAndClientId.substring(idx + 1)); + } + + private HAUtils() { + } +} diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java index 283d59c5..175865fa 100644 --- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/JsonUtils.java @@ -79,10 +79,27 @@ public class JsonUtils { TopicConnectionDO connectionDO = new TopicConnectionDO(); String[] appIdDetailArray = appIdDetail.toString().split("#"); - if (appIdDetailArray.length >= 3) { - connectionDO.setAppId(appIdDetailArray[0]); - connectionDO.setIp(appIdDetailArray[1]); - connectionDO.setClientVersion(appIdDetailArray[2]); + if (appIdDetailArray == null) { + appIdDetailArray = new String[0]; + } + + connectionDO.setAppId(parseTopicConnections(appIdDetailArray, 0)); + connectionDO.setIp(parseTopicConnections(appIdDetailArray, 1)); + connectionDO.setClientVersion(parseTopicConnections(appIdDetailArray, 2)); + + // 解析clientId + StringBuilder sb = new StringBuilder(); + for (int i = 3; i < appIdDetailArray.length - 1; ++i) { + sb.append(parseTopicConnections(appIdDetailArray, i)).append("#"); + } + connectionDO.setClientId(sb.substring(0, sb.length() - 1)); + + // 解析时间 + Long receiveTime = ConvertUtil.string2Long(parseTopicConnections(appIdDetailArray, appIdDetailArray.length - 1)); + if (receiveTime == null) { + connectionDO.setRealConnectTime(-1L); + } else { + connectionDO.setRealConnectTime(receiveTime); } connectionDO.setClusterId(clusterId); @@ -95,4 +112,8 @@ public class JsonUtils { } return connectionDOList; } + + private static String parseTopicConnections(String[] appIdDetailArray, int idx) { + return (appIdDetailArray != null && appIdDetailArray.length >= idx + 1)? appIdDetailArray[idx]: ""; + } } \ No newline at end of file diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/Tuple.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/Tuple.java new file mode 100644 index 00000000..8bdaf527 --- /dev/null +++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/Tuple.java @@ -0,0 +1,61 @@ +package com.xiaojukeji.kafka.manager.common.utils; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + +/** + * @Author: D10865 + * @Description: + * @Date: Create on 2018/5/29 下午4:08 + * @Modified By + */ +@JsonIgnoreProperties(value = { "hibernateLazyInitializer", "handler" }) +@Data +public class Tuple { + + private T v1; + private V v2; + + public Tuple(){} + + public Tuple(T v1, V v2) { + this.v1 = v1; + this.v2 = v2; + } + + public T v1() { + return v1; + } + + public Tuple setV1(T v1) { + this.v1 = v1; + return this; + } + + public V v2() { + return v2; + } + + public Tuple setV2(V v2) { + this.v2 = v2; + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) {return true;} + if (o == null || getClass() != o.getClass()) {return false;} + + Tuple tuple = (Tuple) o; + + if (v1 != null ? !v1.equals(tuple.v1) : tuple.v1 != null) {return false;} + return v2 != null ? v2.equals(tuple.v2) : tuple.v2 == null; + } + + @Override + public int hashCode() { + int result = v1 != null ? v1.hashCode() : 0; + result = 31 * result + (v2 != null ? v2.hashCode() : 0); + return result; + } +} diff --git a/kafka-manager-console/package.json b/kafka-manager-console/package.json index 1362e7f6..80b8afcf 100644 --- a/kafka-manager-console/package.json +++ b/kafka-manager-console/package.json @@ -4,10 +4,10 @@ "description": "", "scripts": { "prestart": "npm install --save-dev webpack-dev-server", - "start": "webpack serve", + "start": "webpack-dev-server", "daily-build": "cross-env NODE_ENV=production webpack", "pre-build": "cross-env NODE_ENV=production webpack", - "prod-build": "cross-env NODE_ENV=production webpack", + "prod-build": "cross-env NODE_OPTIONS=--max-old-space-size=8000 NODE_ENV=production webpack", "fix-memory": "cross-env LIMIT=4096 increase-memory-limit" }, "author": "", @@ -52,10 +52,11 @@ "typescript": "^3.3.3333", "url-loader": "^4.1.1", "webpack": "^4.29.6", - "webpack-cli": "^4.9.1", + "webpack-cli": "^3.2.3", + "webpack-dev-server": "^3.11.3", "xlsx": "^0.16.1" }, "dependencies": { "format-to-json": "^1.0.4" } -} \ No newline at end of file +} diff --git a/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx b/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx index 8f3b128d..a287d1da 100644 --- a/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx +++ b/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx @@ -2,9 +2,11 @@ import * as React from 'react'; import { admin } from 'store/admin'; import { Modal, Form, Radio } from 'antd'; import { IBrokersMetadata, IBrokersRegions, IMetaData } from 'types/base-type'; -import { Alert, message, notification, Table, Tooltip, Transfer } from 'component/antd'; -import { getClusterHaTopicsStatus, setHaTopics, unbindHaTopics } from 'lib/api'; +import { Alert, message, notification, Table, Tooltip, Spin } from 'component/antd'; +import { getClusterHaTopicsStatus, getAppRelatedTopics, setHaTopics, unbindHaTopics } from 'lib/api'; import { cellStyle } from 'constants/table'; +import { renderAttributes, TransferTable, IKafkaUser } from './TopicHaSwitch' +import './index.less' const layout = { labelCol: { span: 3 }, @@ -24,6 +26,7 @@ interface IHaTopic { clusterId: number; clusterName: string; haRelation: number; + isHaRelation: boolean; topicName: string; key: string; disabled?: boolean; @@ -68,27 +71,104 @@ const resColumns = [ }, }, ]; + +const columns = [ + { + dataIndex: 'topicName', + title: '名称', + width: 260, + ellipsis: true, + }, +]; + +const kafkaUserColumn = [ + { + dataIndex: 'kafkaUser', + title: 'kafkaUser', + width: 100, + ellipsis: true, + }, + { + dataIndex: 'manualSelectedTopics', + title: '已选中Topic', + width: 120, + render: (text: string[]) => { + return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-'; + }, + }, + { + dataIndex: 'autoSelectedTopics', + title: '选中关联Topic', + width: 120, + render: (text: string[]) => { + return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-'; + }, + }, +]; + class TopicHaRelation extends React.Component { public state = { radioCheck: 'spec', - haTopics: [] as IHaTopic[], + topics: [] as IHaTopic[], + kafkaUsers: [] as IKafkaUser[], targetKeys: [] as string[], + selectedKeys: [] as string[], confirmLoading: false, firstMove: true, primaryActiveKeys: [] as string[], primaryStandbyKeys: [] as string[], + manualSelectedKeys: [] as string[], + spinLoading: false, }; + public selectSingle = null as boolean; + public manualSelectedNames = [] as string[]; + + public setSelectSingle = (val: boolean) => { + this.selectSingle = val; + } + public setManualSelectedNames = (keys: string[]) => { + // this.manualSelectedNames = this.getTopicsByKeys(keys); + this.manualSelectedNames = keys; + } + + public filterManualSelectedKeys = (key: string, selected: boolean) => { + const newManualSelectedKeys = [...this.state.manualSelectedKeys]; + const index = newManualSelectedKeys.findIndex(item => item === key); + if (selected) { + if (index === -1) newManualSelectedKeys.push(key); + } else { + if (index !== -1) newManualSelectedKeys.splice(index, 1); + } + this.setManualSelectedNames(newManualSelectedKeys); + this.setState({ + manualSelectedKeys: newManualSelectedKeys, + }); + } + + public getManualSelected = (single: boolean, key?: any, selected?: boolean) => { + this.setSelectSingle(single); + if (single) { + this.filterManualSelectedKeys(key, selected); + } else { + this.setManualSelectedNames(key); + this.setState({ + manualSelectedKeys: key, + }); + } + } + public handleOk = () => { this.props.form.validateFields((err: any, values: any) => { - const unbindTopics = []; - const bindTopics = []; + const { primaryStandbyKeys, targetKeys} = this.state; + const unbindKeys = []; + const bindKeys = []; if (values.rule === 'all') { setHaTopics({ all: true, activeClusterId: this.props.currentCluster.clusterId, - standbyClusterId: this.props.currentCluster.haClusterVO.clusterId, + standbyClusterId: this.props.currentCluster.haClusterVO?.clusterId, topicNames: [], }).then(res => { handleMsg(res, '关联成功'); @@ -100,18 +180,18 @@ class TopicHaRelation extends React.Component { return; } - for (const item of this.state.primaryStandbyKeys) { - if (!this.state.targetKeys.includes(item)) { - unbindTopics.push(item); + for (const item of primaryStandbyKeys) { + if (!targetKeys.includes(item)) { + unbindKeys.push(item); } } - for (const item of this.state.targetKeys) { - if (!this.state.primaryStandbyKeys.includes(item)) { - bindTopics.push(item); + for (const item of targetKeys) { + if (!primaryStandbyKeys.includes(item)) { + bindKeys.push(item); } } - if (!unbindTopics.length && !bindTopics.length) { + if (!unbindKeys.length && !bindKeys.length) { return message.info('请选择您要操作的Topic'); } @@ -140,15 +220,16 @@ class TopicHaRelation extends React.Component { this.props.reload(); }; - if (bindTopics.length) { + if (bindKeys.length) { this.setState({ confirmLoading: true, }); setHaTopics({ all: false, activeClusterId: this.props.currentCluster.clusterId, - standbyClusterId: this.props.currentCluster.haClusterVO.clusterId, - topicNames: bindTopics, + standbyClusterId: this.props.currentCluster.haClusterVO?.clusterId, + // topicNames: this.getTopicsByKeys(bindKeys), + topicNames: bindKeys, }).then(res => { this.setState({ confirmLoading: false, @@ -158,15 +239,17 @@ class TopicHaRelation extends React.Component { }); } - if (unbindTopics.length) { + if (unbindKeys.length) { this.setState({ confirmLoading: true, }); unbindHaTopics({ all: false, activeClusterId: this.props.currentCluster.clusterId, - standbyClusterId: this.props.currentCluster.haClusterVO.clusterId, - topicNames: unbindTopics, + standbyClusterId: this.props.currentCluster.haClusterVO?.clusterId, + // topicNames: this.getTopicsByKeys(unbindKeys), + topicNames: unbindKeys, + retainStandbyResource: values.retainStandbyResource, }).then(res => { this.setState({ confirmLoading: false, @@ -194,43 +277,253 @@ class TopicHaRelation extends React.Component { let isReset = false; // 判断当前移动是否还原为最初的状态 if (primaryStandbyKeys.length === targetKeys.length) { - targetKeys.sort((a, b) => +a - (+b)); - primaryStandbyKeys.sort((a, b) => +a - (+b)); - let i = 0; - while (i < targetKeys.length) { - if (targetKeys[i] === primaryStandbyKeys[i]) { - i++; - } else { - break; - } - } - isReset = i === targetKeys.length; + const diff = targetKeys.find(item => primaryStandbyKeys.indexOf(item) < 0); + isReset = diff ? false : true; } return isReset; } + public getNewSelectKeys = (removeKeys: string[], selectedKeys: string[]) => { + const { topics, kafkaUsers } = this.state; + // 根据移除的key找与该key关联的其他key,一起移除 + let relatedTopics: string[] = []; + const relatedKeys: string[] = []; + const newSelectKeys = []; + for (const key of removeKeys) { + const topicName = topics.find(row => row.key === key)?.topicName; + for (const item of kafkaUsers) { + if (item.selectedTopicNameList.includes(topicName)) { + relatedTopics = relatedTopics.concat(item.selectedTopicNameList, item.notSelectTopicNameList); + } + } + for (const item of relatedTopics) { + const key = topics.find(row => row.topicName === item)?.key; + if (key) { + relatedKeys.push(key); + } + } + for (const key of selectedKeys) { + if (!relatedKeys.includes(key)) { + newSelectKeys.push(key); + } + } + } + return newSelectKeys; + } + public setTopicsStatus = (targetKeys: string[], disabled: boolean, isAll = false) => { - const { haTopics } = this.state; - const newTopics = Array.from(haTopics); + const { topics } = this.state; + const newTopics = Array.from(topics); if (isAll) { - for (let i = 0; i < haTopics.length; i++) { + for (let i = 0; i < topics.length; i++) { newTopics[i].disabled = disabled; } } else { for (const key of targetKeys) { - const index = haTopics.findIndex(item => item.key === key); + const index = topics.findIndex(item => item.key === key); if (index > -1) { newTopics[index].disabled = disabled; } } } this.setState(({ - haTopics: newTopics, + topics: newTopics, })); } - public onTransferChange = (targetKeys: string[], direction: string, moveKeys: string[]) => { - const { primaryStandbyKeys, firstMove, primaryActiveKeys } = this.state; + public getTopicsByKeys = (keys: string[]) => { + // 依据key值找topicName + const topicNames: string[] = []; + for (const key of keys) { + const topicName = this.state.topics.find(item => item.key === key)?.topicName; + if (topicName) { + topicNames.push(topicName); + } + } + return topicNames; + } + + public getNewKafkaUser = (targetKeys: string[]) => { + const { primaryStandbyKeys, kafkaUsers, topics } = this.state; + const removeKeys = []; + const addKeys = []; + for (const key of primaryStandbyKeys) { + if (targetKeys.indexOf(key) < 0) { + // 移除的 + removeKeys.push(key); + } + } + for (const key of targetKeys) { + if (primaryStandbyKeys.indexOf(key) < 0) { + // 新增的 + addKeys.push(key); + } + } + const keepKeys = [...removeKeys, ...addKeys]; + const newKafkaUsers = kafkaUsers; + + const moveTopics = this.getTopicsByKeys(keepKeys); + + for (const topic of moveTopics) { + for (const item of newKafkaUsers) { + if (item.selectedTopicNameList.includes(topic)) { + item.show = true; + } + } + } + + const showKafaUsers = newKafkaUsers.filter(item => item.show === true); + + for (const item of showKafaUsers) { + let i = 0; + while (i < moveTopics.length) { + if (!item.selectedTopicNameList.includes(moveTopics[i])) { + i++; + } else { + break; + } + } + + // 表示该kafkaUser不该展示 + if (i === moveTopics.length) { + item.show = false; + } + } + + return showKafaUsers; + } + + public getAppRelatedTopicList = (selectedKeys: string[]) => { + const { topics, targetKeys, primaryStandbyKeys, kafkaUsers } = this.state; + const filterTopicNameList = this.getTopicsByKeys(selectedKeys); + const isReset = this.isPrimaryStatus(targetKeys); + + if (!filterTopicNameList.length && isReset) { + // targetKeys + this.setState({ + kafkaUsers: kafkaUsers.map(item => ({ + ...item, + show: false, + })), + }); + return; + } else { + // 保留选中项与移动的的项 + this.setState({ + kafkaUsers: this.getNewKafkaUser(targetKeys), + }); + } + + // 单向选择,所以取当前值的clusterId + const clusterInfo = topics.find(item => item.topicName === filterTopicNameList[0]); + const clusterPhyId = clusterInfo?.clusterId; + if (!clusterPhyId) return; + this.setState({spinLoading: true}); + getAppRelatedTopics({ + clusterPhyId, + filterTopicNameList, + ha: clusterInfo.isHaRelation, + useKafkaUserAndClientId: false, + }).then((res: IKafkaUser[]) => { + let notSelectTopicNames: string[] = []; + const notSelectTopicKeys: string[] = []; + for (const item of (res || [])) { + notSelectTopicNames = notSelectTopicNames.concat(item.notSelectTopicNameList || []); + } + + for (const item of notSelectTopicNames) { + const key = topics.find(row => row.topicName === item)?.key; + + if (key && notSelectTopicKeys.indexOf(key) < 0) { + notSelectTopicKeys.push(key); + } + } + + const newSelectedKeys = selectedKeys.concat(notSelectTopicKeys); + const newKafkaUsers = (res || []).map(item => ({ + ...item, + show: true, + manualSelectedTopics: item.selectedTopicNameList.filter(topic => this.manualSelectedNames.indexOf(topic) > -1), + autoSelectedTopics: [...item.selectedTopicNameList, ...item.notSelectTopicNameList].filter(topic => this.manualSelectedNames.indexOf(topic) === -1), + })); + const { kafkaUsers } = this.state; + + for (const item of kafkaUsers) { + const resItem = res.find(row => row.kafkaUser === item.kafkaUser); + if (!resItem) { + newKafkaUsers.push(item); + } + } + this.setState({ + kafkaUsers: newKafkaUsers, + selectedKeys: newSelectedKeys, + }); + + if (notSelectTopicKeys.length) { + this.getAppRelatedTopicList(newSelectedKeys); + } + }).finally(() => { + this.setState({spinLoading: false}); + }); + } + + public getRelatedKeys = (currentKeys: string[]) => { + // 未被选中的项 + const removeKeys = []; + // 对比上一次记录的选中的值找出本次取消的项 + const { selectedKeys } = this.state; + for (const preKey of selectedKeys) { + if (!currentKeys.includes(preKey)) { + removeKeys.push(preKey); + } + } + + return removeKeys?.length ? this.getNewSelectKeys(removeKeys, currentKeys) : currentKeys; + } + + public handleTopicChange = (sourceSelectedKeys: string[], targetSelectedKeys: string[]) => { + if (this.selectSingle) { + this.setSelectSingle(false); + } else { + this.getManualSelected(false, [...sourceSelectedKeys, ...targetSelectedKeys]) + } + const { topics, targetKeys } = this.state; + // 条件限制只允许选中一边,单向操作 + const keys = [...sourceSelectedKeys, ...targetSelectedKeys]; + + // 判断当前选中项属于哪一类 + if (keys.length) { + const isHaRelation = topics.find(item => item.key === keys[0])?.isHaRelation; + const needDisabledKeys = topics.filter(item => item.isHaRelation !== isHaRelation).map(row => row.key); + this.setTopicsStatus(needDisabledKeys, true); + } + const selectedKeys = this.state.selectedKeys.length ? this.getRelatedKeys(keys) : keys; + + const isReset = this.isPrimaryStatus(targetKeys); + if (!selectedKeys.length && isReset) { + this.setTopicsStatus([], false, true); + } + this.setState({ + selectedKeys, + }); + this.getAppRelatedTopicList(selectedKeys); + } + + public onDirectChange = (targetKeys: string[], direction: string, moveKeys: string[]) => { + const { primaryStandbyKeys, firstMove, primaryActiveKeys, kafkaUsers } = this.state; + + const getKafkaUser = () => { + const newKafkaUsers = kafkaUsers; + const moveTopics = this.getTopicsByKeys(moveKeys); + for (const topic of moveTopics) { + for (const item of newKafkaUsers) { + if (item.selectedTopicNameList.includes(topic)) { + item.show = true; + } + } + } + return newKafkaUsers; + }; // 判断当前移动是否还原为最初的状态 const isReset = this.isPrimaryStatus(targetKeys); if (firstMove) { @@ -238,24 +531,26 @@ class TopicHaRelation extends React.Component { this.setTopicsStatus(primaryKeys, true, false); this.setState(({ firstMove: false, + kafkaUsers: getKafkaUser(), targetKeys, })); return; } - // 如果是还原为初始状态则还原禁用状态 if (isReset) { this.setTopicsStatus([], false, true); this.setState(({ firstMove: true, targetKeys, + kafkaUsers: [], })); return; } - this.setState({ + this.setState(({ targetKeys, - }); + kafkaUsers: this.getNewKafkaUser(targetKeys), + })); } public componentDidMount() { @@ -265,17 +560,19 @@ class TopicHaRelation extends React.Component { ]).then(([activeRes, standbyRes]: IHaTopic[][]) => { activeRes = (activeRes || []).map(row => ({ ...row, + isHaRelation: row.haRelation === 1 || row.haRelation === 0, key: row.topicName, - })).filter(item => item.haRelation === null); + })).filter(item => !item.isHaRelation); standbyRes = (standbyRes || []).map(row => ({ ...row, + isHaRelation: row.haRelation === 1 || row.haRelation === 0, key: row.topicName, - })).filter(item => item.haRelation === 1 || item.haRelation === 0); + })).filter(item => item.isHaRelation); this.setState({ - haTopics: [].concat([...activeRes, ...standbyRes]).sort((a, b) => a.topicName.localeCompare(b.topicName)), - primaryActiveKeys: activeRes.map(row => row.topicName), - primaryStandbyKeys: standbyRes.map(row => row.topicName), - targetKeys: standbyRes.map(row => row.topicName), + topics: [].concat([...activeRes, ...standbyRes]).sort((a, b) => a.topicName.localeCompare(b.topicName)), + primaryActiveKeys: activeRes.map(row => row.key), + primaryStandbyKeys: standbyRes.map(row => row.key), + targetKeys: standbyRes.map(row => row.key), }); }); } @@ -287,6 +584,9 @@ class TopicHaRelation extends React.Component { metadata = admin.brokersMetadata ? admin.brokersMetadata : metadata; let regions = [] as IBrokersRegions[]; regions = admin.brokersRegions ? admin.brokersRegions : regions; + const { kafkaUsers, confirmLoading, radioCheck, targetKeys, selectedKeys, topics, primaryStandbyKeys, spinLoading} = this.state; + const tableData = kafkaUsers.filter(row => row.show); + return ( <> { onOk={this.handleOk} onCancel={this.handleCancel} maskClosable={false} - confirmLoading={this.state.confirmLoading} - width={590} + confirmLoading={confirmLoading} + width={800} okText="确认" cancelText="取消" > -
- {/* - {getFieldDecorator('rule', { - initialValue: 'spec', - rules: [{ - required: true, - message: '请选择规则', - }], - })( - 应用于所有Topic - 应用于特定Topic - )} - */} - {this.state.radioCheck === 'spec' ? - {getFieldDecorator('topicNames', { - initialValue: this.state.targetKeys, - rules: [{ - required: false, - message: '请选择Topic', - }], - })( - item.topicName} - titles={['未关联', '已关联']} - locale={{ - itemUnit: '', - itemsUnit: '', - }} - />, - )} - : ''} -
+ +
+ {/* + {getFieldDecorator('rule', { + initialValue: 'spec', + rules: [{ + required: true, + message: '请选择规则', + }], + })( + 应用于所有Topic + 应用于特定Topic + )} + */} + {radioCheck === 'spec' ? + {getFieldDecorator('topicNames', { + initialValue: targetKeys, + rules: [{ + required: false, + message: '请选择Topic', + }], + })( + , + )} + : null} + {radioCheck === 'spec' ? : null} + {targetKeys.length < primaryStandbyKeys.length ? + {getFieldDecorator('retainStandbyResource', { + initialValue: false, + rules: [{ + required: true, + message: '请选择数据清理策略', + }], + })( + 删除备集群所有数据 + 保留备集群所有数据 + )} + : null} + + ); diff --git a/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx b/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx index 78c5565b..aa5d6e9c 100644 --- a/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx +++ b/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx @@ -1,12 +1,13 @@ import * as React from 'react'; import { admin } from 'store/admin'; -import { Modal, Form, Radio, Tag, Popover, Button } from 'antd'; +import { Modal, Form, Radio, Tag, Popover, Button, Tooltip, Spin } from 'antd'; import { IBrokersMetadata, IBrokersRegions, IMetaData } from 'types/base-type'; import { Alert, Icon, message, Table, Transfer } from 'component/antd'; import { getClusterHaTopics, getAppRelatedTopics, createSwitchTask } from 'lib/api'; import { TooltipPlacement } from 'antd/es/tooltip'; import * as XLSX from 'xlsx'; import moment from 'moment'; +import { cloneDeep } from "lodash"; import { timeMinute } from 'constants/strategy'; const layout = { @@ -35,13 +36,17 @@ interface IHaTopic { disabled?: boolean; } -interface IKafkaUser { +export interface IKafkaUser { clusterPhyId: number; kafkaUser: string; notHaTopicNameList: string[]; notSelectTopicNameList: string[]; selectedTopicNameList: string[]; show: boolean; + manualSelectedTopics: string[]; + autoSelectedTopics: string[]; + clientId?: string; + haClientIdList?: string[] } const columns = [ @@ -71,17 +76,23 @@ const kafkaUserColumn = [ ellipsis: true, }, { - dataIndex: 'selectedTopicNameList', + dataIndex: 'clientId', + title: 'clientID', + width: 100, + ellipsis: true, + }, + { + dataIndex: 'manualSelectedTopics', title: '已选中Topic', - width: 120, + // width: 120, render: (text: string[]) => { return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-'; }, }, { - dataIndex: 'notSelectTopicNameList', + dataIndex: 'autoSelectedTopics', title: '选中关联Topic', - width: 120, + // width: 120, render: (text: string[]) => { return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-'; }, @@ -89,7 +100,7 @@ const kafkaUserColumn = [ { dataIndex: 'notHaTopicNameList', title: '未建立HA Topic', - width: 120, + // width: 120, render: (text: string[]) => { return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-'; }, @@ -135,31 +146,64 @@ export const renderAttributes = (params: { class TopicHaSwitch extends React.Component { public state = { radioCheck: 'spec', + switchMode: 'kafkaUser', targetKeys: [] as string[], selectedKeys: [] as string[], topics: [] as IHaTopic[], kafkaUsers: [] as IKafkaUser[], + primaryTopics: [] as string[], primaryActiveKeys: [] as string[], primaryStandbyKeys: [] as string[], firstMove: true, + manualSelectedKeys: [] as string[], + selectTableColumn: kafkaUserColumn.filter(item => item.title !== 'clientID') as [], + spinLoading: false, }; + public selectSingle = null as boolean; + public manualSelectedNames = [] as string[]; + + public setSelectSingle = (val: boolean) => { + this.selectSingle = val; + } + public setManualSelectedNames = (keys: string[]) => { + // this.manualSelectedNames = this.getTopicsByKeys(keys); + this.manualSelectedNames = keys; + } + + public filterManualSelectedKeys = (key: string, selected: boolean) => { + const newManualSelectedKeys = [...this.state.manualSelectedKeys]; + const index = newManualSelectedKeys.findIndex(item => item === key); + if (selected) { + if (index === -1) newManualSelectedKeys.push(key); + } else { + if (index !== -1) newManualSelectedKeys.splice(index, 1); + } + this.setManualSelectedNames(newManualSelectedKeys); + this.setState({ + manualSelectedKeys: newManualSelectedKeys, + }); + } + + public getManualSelected = (single: boolean, key?: any, selected?: boolean) => { + this.setSelectSingle(single); + if (single) { + this.filterManualSelectedKeys(key, selected); + } else { + this.setManualSelectedNames(key); + this.setState({ + manualSelectedKeys: key, + }); + } + } + public isPrimaryStatus = (targetKeys: string[]) => { const { primaryStandbyKeys } = this.state; let isReset = false; // 判断当前移动是否还原为最初的状态 if (primaryStandbyKeys.length === targetKeys.length) { - targetKeys.sort((a, b) => +a - (+b)); - primaryStandbyKeys.sort((a, b) => +a - (+b)); - let i = 0; - while (i < targetKeys.length) { - if (targetKeys[i] === primaryStandbyKeys[i]) { - i++; - } else { - break; - } - } - isReset = i === targetKeys.length; + const diff = targetKeys.find(item => primaryStandbyKeys.indexOf(item) < 0); + isReset = diff ? false : true; } return isReset; } @@ -168,16 +212,17 @@ class TopicHaSwitch extends React.Component { const targetTopics = []; for (const key of currentKeys) { if (!primaryKeys.includes(key)) { - const topic = this.state.topics.find(item => item.key === key)?.topicName; - targetTopics.push(topic); + // const topic = this.state.topics.find(item => item.key === key)?.topicName; + // targetTopics.push(topic); + targetTopics.push(key); } } return targetTopics; } public handleOk = () => { - const { primaryStandbyKeys, primaryActiveKeys, topics } = this.state; - const standbyClusterId = this.props.currentCluster.haClusterVO.clusterId; + const { primaryStandbyKeys, primaryActiveKeys, topics, kafkaUsers, switchMode } = this.state; + const standbyClusterId = this.props.currentCluster.haClusterVO?.clusterId; const activeClusterId = this.props.currentCluster.clusterId; this.props.form.validateFields((err: any, values: any) => { @@ -188,6 +233,7 @@ class TopicHaSwitch extends React.Component { all: true, mustContainAllKafkaUserTopics: true, standbyClusterPhyId: standbyClusterId, + kafkaUserAndClientIdList: [], topicNameList: [], }).then(res => { message.success('任务创建成功'); @@ -217,11 +263,28 @@ class TopicHaSwitch extends React.Component { const activeClusterPhyId = currentStandbyKeys.length > primaryStandbyKeys.length ? standbyClusterId : activeClusterId; const standbyClusterPhyId = currentStandbyKeys.length > primaryStandbyKeys.length ? activeClusterId : standbyClusterId; const targetTopics = this.getTargetTopics(currentKeys, primaryKeys); + const clientIdParams = kafkaUsers.map(item => ({ clientId: item.clientId, kafkaUser: item.kafkaUser })); + const kafkaUserParams = [] as any; + kafkaUsers.forEach(item => { + kafkaUserParams.push({ + clientId: null, + kafkaUser: item.kafkaUser, + }); + if (item.haClientIdList?.length) { + item.haClientIdList.forEach(clientId => { + kafkaUserParams.push({ + clientId, + kafkaUser: item.kafkaUser, + }); + }); + } + }); createSwitchTask({ activeClusterPhyId, all: false, - mustContainAllKafkaUserTopics: true, + mustContainAllKafkaUserTopics: switchMode === 'kafkaUser', standbyClusterPhyId, + kafkaUserAndClientIdList: switchMode === 'clientID' ? clientIdParams : kafkaUserParams, topicNameList: targetTopics, }).then(res => { message.success('任务创建成功'); @@ -252,8 +315,7 @@ class TopicHaSwitch extends React.Component { const topicName = topics.find(row => row.key === key)?.topicName; for (const item of kafkaUsers) { if (item.selectedTopicNameList.includes(topicName)) { - relatedTopics = relatedTopics.concat(item.selectedTopicNameList); - relatedTopics = relatedTopics.concat(item.notSelectTopicNameList); + relatedTopics = relatedTopics.concat(item.selectedTopicNameList, item.notSelectTopicNameList); } } for (const item of relatedTopics) { @@ -291,21 +353,20 @@ class TopicHaSwitch extends React.Component { })); } - public getFilterTopics = (selectKeys: string[]) => { + public getTopicsByKeys = (keys: string[]) => { // 依据key值找topicName - const filterTopics: string[] = []; - const targetKeys = selectKeys; - for (const key of targetKeys) { + const topicNames: string[] = []; + for (const key of keys) { const topicName = this.state.topics.find(item => item.key === key)?.topicName; if (topicName) { - filterTopics.push(topicName); + topicNames.push(topicName); } } - return filterTopics; + return topicNames; } public getNewKafkaUser = (targetKeys: string[]) => { - const { primaryStandbyKeys, topics } = this.state; + const { primaryStandbyKeys, kafkaUsers, topics } = this.state; const removeKeys = []; const addKeys = []; for (const key of primaryStandbyKeys) { @@ -321,9 +382,9 @@ class TopicHaSwitch extends React.Component { } } const keepKeys = [...removeKeys, ...addKeys]; - const newKafkaUsers = this.state.kafkaUsers; + const newKafkaUsers = kafkaUsers; - const moveTopics = this.getFilterTopics(keepKeys); + const moveTopics = this.getTopicsByKeys(keepKeys); for (const topic of moveTopics) { for (const item of newKafkaUsers) { @@ -355,8 +416,8 @@ class TopicHaSwitch extends React.Component { } public getAppRelatedTopicList = (selectedKeys: string[]) => { - const { topics, targetKeys, primaryStandbyKeys, kafkaUsers } = this.state; - const filterTopicNameList = this.getFilterTopics(selectedKeys); + const { topics, targetKeys, primaryStandbyKeys, kafkaUsers, switchMode } = this.state; + const filterTopicNameList = this.getTopicsByKeys(selectedKeys); const isReset = this.isPrimaryStatus(targetKeys); if (!filterTopicNameList.length && isReset) { @@ -376,10 +437,14 @@ class TopicHaSwitch extends React.Component { } // 单向选择,所以取当前值的aactiveClusterId - const clusterPhyId = topics.find(item => item.topicName === filterTopicNameList[0]).activeClusterId; + const clusterPhyId = topics.find(item => item.topicName === filterTopicNameList[0])?.activeClusterId; + if (!clusterPhyId) return; + this.setState({spinLoading: true}); getAppRelatedTopics({ clusterPhyId, filterTopicNameList, + ha: true, + useKafkaUserAndClientId: switchMode === 'clientID', }).then((res: IKafkaUser[]) => { let notSelectTopicNames: string[] = []; const notSelectTopicKeys: string[] = []; @@ -390,7 +455,7 @@ class TopicHaSwitch extends React.Component { for (const item of notSelectTopicNames) { const key = topics.find(row => row.topicName === item)?.key; - if (key) { + if (key && notSelectTopicKeys.indexOf(key) < 0) { notSelectTopicKeys.push(key); } } @@ -399,11 +464,13 @@ class TopicHaSwitch extends React.Component { const newKafkaUsers = (res || []).map(item => ({ ...item, show: true, + manualSelectedTopics: item.selectedTopicNameList.filter(topic => this.manualSelectedNames.indexOf(topic) > -1), + autoSelectedTopics: [...item.selectedTopicNameList, ...item.notSelectTopicNameList].filter(topic => this.manualSelectedNames.indexOf(topic) === -1), })); const { kafkaUsers } = this.state; for (const item of kafkaUsers) { - const resItem = res.find(row => row.kafkaUser === item.kafkaUser); + const resItem = res.find(row => switchMode === 'clientID' ? row.kafkaUser === item.kafkaUser && row.clientId === item.clientId : row.kafkaUser === item.kafkaUser); if (!resItem) { newKafkaUsers.push(item); } @@ -416,6 +483,8 @@ class TopicHaSwitch extends React.Component { if (notSelectTopicKeys.length) { this.getAppRelatedTopicList(newSelectedKeys); } + }).finally(() => { + this.setState({spinLoading: false}); }); } @@ -440,7 +509,7 @@ class TopicHaSwitch extends React.Component { // 判断当前选中项属于哪一类 if (keys.length) { - const activeClusterId = topics.find(item => item.key === keys[0]).activeClusterId; + const activeClusterId = topics.find(item => item.key === keys[0])?.activeClusterId; const needDisabledKeys = topics.filter(item => item.activeClusterId !== activeClusterId).map(row => row.key); this.setTopicsStatus(needDisabledKeys, true); } @@ -457,11 +526,11 @@ class TopicHaSwitch extends React.Component { } public onDirectChange = (targetKeys: string[], direction: string, moveKeys: string[]) => { - const { primaryStandbyKeys, firstMove, primaryActiveKeys, topics } = this.state; + const { primaryStandbyKeys, firstMove, primaryActiveKeys, kafkaUsers, topics } = this.state; const getKafkaUser = () => { - const newKafkaUsers = this.state.kafkaUsers; - const moveTopics = this.getFilterTopics(moveKeys); + const newKafkaUsers = kafkaUsers; + const moveTopics = this.getTopicsByKeys(moveKeys); for (const topic of moveTopics) { for (const item of newKafkaUsers) { if (item.selectedTopicNameList.includes(topic)) { @@ -503,21 +572,27 @@ class TopicHaSwitch extends React.Component { } public downloadData = () => { - const { kafkaUsers } = this.state; + const { kafkaUsers, switchMode } = this.state; const tableData = kafkaUsers.map(item => { - return { + const column = { // tslint:disable 'kafkaUser': item.kafkaUser, - '已选中Topic': item.selectedTopicNameList?.join('、'), - '选中关联Topic': item.notSelectTopicNameList?.join('、'), + 'clientID': item.clientId, + '已选中Topic': item.manualSelectedTopics?.join('、'), + '选中关联Topic': item.autoSelectedTopics?.join('、'), '未建立HA Topic': item.notHaTopicNameList?.join(`、`), }; + if (switchMode === 'kafkaUser') { + delete column.clientID + } + return column; }); const data = [].concat(tableData); const wb = XLSX.utils.book_new(); // json转sheet + const header = ['kafkaUser', 'clientID', '已选中Topic', '选中关联Topic', '未建立HA Topic']; const ws = XLSX.utils.json_to_sheet(data, { - header: ['kafkaUser', '已选中Topic', '选中关联Topic', '未建立HA Topic'], + header: switchMode === 'kafkaUser' ? header.filter(item => item !== 'clientID') : header, }); // XLSX.utils. XLSX.utils.book_append_sheet(wb, ws, 'kafkaUser'); @@ -537,18 +612,38 @@ class TopicHaSwitch extends React.Component { return false; } + public onModeChange = (e: any) => { + const mode = e.target.value; + // 切换方式变更时,初始化数据 + const { primaryTopics, primaryStandbyKeys } = this.state; + this.setState({ + switchMode: mode, + topics: cloneDeep(primaryTopics), + targetKeys: primaryStandbyKeys, + selectedKeys: [], + kafkaUsers: [], + firstMove: true, + manualSelectedKeys: [], + selectTableColumn: mode === 'kafkaUser' ? kafkaUserColumn.filter(item => item.title !== 'clientID') : kafkaUserColumn, + }); + this.props.form.setFieldsValue({targetKeys: primaryStandbyKeys}); + this.setSelectSingle(null); + this.setManualSelectedNames([]); + } + public componentDidMount() { - const standbyClusterId = this.props.currentCluster.haClusterVO.clusterId; + const standbyClusterId = this.props.currentCluster.haClusterVO?.clusterId; const activeClusterId = this.props.currentCluster.clusterId; - getClusterHaTopics(this.props.currentCluster.clusterId, standbyClusterId).then((res: IHaTopic[]) => { - res = res.map((item, index) => ({ - key: index.toString(), + getClusterHaTopics(activeClusterId, standbyClusterId).then((res: IHaTopic[]) => { + res = res.map((item) => ({ + key: item.topicName, ...item, })); const targetKeys = (res || []).filter((item) => item.activeClusterId === standbyClusterId).map(row => row.key); const primaryActiveKeys = (res || []).filter((item) => item.activeClusterId === activeClusterId).map(row => row.key); this.setState({ topics: res || [], + primaryTopics: cloneDeep(res) || [], primaryStandbyKeys: targetKeys, primaryActiveKeys, targetKeys, @@ -563,7 +658,15 @@ class TopicHaSwitch extends React.Component { metadata = admin.brokersMetadata ? admin.brokersMetadata : metadata; let regions = [] as IBrokersRegions[]; regions = admin.brokersRegions ? admin.brokersRegions : regions; - const tableData = this.state.kafkaUsers.filter(row => row.show); + const { switchMode, kafkaUsers, radioCheck, targetKeys, selectedKeys, topics, selectTableColumn, spinLoading } = this.state; + const tableData = kafkaUsers.filter(row => row.show); + const rulesNode = ( +
+ 1、符合规范的ClientID格式为P#或C#前缀,分别代表生产者客户端和消费者客户端。 +
+ 2、此处只展示符合规范的ClientID格式的高可用Topic。若未找到所需Topic,请检查ClientID的格式是否正确。 +
+ ); return ( { } > -
- {/* - {getFieldDecorator('rule', { - initialValue: 'spec', - rules: [{ - required: true, - message: '请选择规则', - }], - })( - 应用于所有Topic - 应用于特定Topic - )} - */} - {this.state.radioCheck === 'spec' ? - {getFieldDecorator('targetKeys', { - initialValue: this.state.targetKeys, - rules: [{ - required: false, - message: '请选择Topic', - }], - })( - , - )} - : ''} - - {this.state.radioCheck === 'spec' ? - <> -
- {this.state.kafkaUsers.length ? : null} - - : null} + +
+ + {getFieldDecorator('switchMode', { + initialValue: 'kafkaUser', + rules: [{ + required: true, + message: '请选择切换维度', + }], + })( + kafkaUser + kafkaUser + clientID + )} + + {/* + {getFieldDecorator('rule', { + initialValue: 'spec', + rules: [{ + required: true, + message: '请选择规则', + }], + })( + 应用于所有Topic + 应用于特定Topic + )} + */} + {switchMode === 'clientID' &&
+ + 规则说明 + +
} + {radioCheck === 'spec' ? + {getFieldDecorator('targetKeys', { + initialValue: targetKeys, + rules: [{ + required: false, + message: '请选择Topic', + }], + })( + , + )} + : null} + + {radioCheck === 'spec' ? + <> +
+ {tableData.length ? : null} + + : null} + ); } } export const TopicSwitchWrapper = Form.create()(TopicHaSwitch); -const TableTransfer = ({ leftColumns, ...restProps }: any) => ( +export const TableTransfer = ({ leftColumns, getManualSelected, tableAttrs, ...restProps }: any) => ( {({ filteredItems, @@ -651,6 +775,7 @@ const TableTransfer = ({ leftColumns, ...restProps }: any) => ( disabled: item.disabled, }), onSelect({ key }: any, selected: any) { + getManualSelected(true, key, selected); onItemSelect(key, selected); }, selectedRowKeys: listSelectedKeys, @@ -668,9 +793,11 @@ const TableTransfer = ({ leftColumns, ...restProps }: any) => ( onRow={({ key, disabled }) => ({ onClick: () => { if (disabled) return; + getManualSelected(true, key, listSelectedKeys.includes(key)); onItemSelect(key, !listSelectedKeys.includes(key)); }, })} + {...tableAttrs} /> ); }} @@ -683,8 +810,12 @@ interface IProps { onDirectChange?: any; currentCluster: any; topicChange: any; + columns: any[]; dataSource: any[]; selectedKeys: string[]; + getManualSelected: any; + transferAttrs?: any; + tableAttrs?: any; } export class TransferTable extends React.Component { @@ -695,7 +826,7 @@ export class TransferTable extends React.Component { } public render() { - const { currentCluster, dataSource, value, topicChange, selectedKeys } = this.props; + const { currentCluster, columns, dataSource, value, topicChange, selectedKeys, getManualSelected, transferAttrs, tableAttrs } = this.props; return (
{ showSearch={true} onChange={this.onChange} onSelectChange={topicChange} + filterOption={(inputValue: string, item: any) => item.topicName?.indexOf(inputValue) > -1} leftColumns={columns} - titles={[`集群${currentCluster.clusterName}`, `集群${currentCluster.haClusterVO.clusterName}`]} + titles={[`集群${currentCluster.clusterName}`, `集群${currentCluster.haClusterVO?.clusterName}`]} locale={{ itemUnit: '', itemsUnit: '', }} + getManualSelected={getManualSelected} + tableAttrs={tableAttrs} + {...transferAttrs} />
); diff --git a/kafka-manager-console/src/container/modal/admin/index.less b/kafka-manager-console/src/container/modal/admin/index.less new file mode 100644 index 00000000..b741039f --- /dev/null +++ b/kafka-manager-console/src/container/modal/admin/index.less @@ -0,0 +1,8 @@ +.ant-table-wrapper.no-lr-padding { + padding-left: 0!important; + padding-right: 0!important; +} + +.no-table-header .ant-table-header { + display: none; +} \ No newline at end of file diff --git a/kafka-manager-console/src/container/topic/topic-detail/connect-information.tsx b/kafka-manager-console/src/container/topic/topic-detail/connect-information.tsx index 1e5ab182..0b6c44b3 100644 --- a/kafka-manager-console/src/container/topic/topic-detail/connect-information.tsx +++ b/kafka-manager-console/src/container/topic/topic-detail/connect-information.tsx @@ -6,6 +6,8 @@ import { Table, Tooltip } from 'component/antd'; import { SearchAndFilterContainer } from 'container/search-filter'; import Url from 'lib/url-parser'; import { pagination, cellStyle } from 'constants/table'; +import moment = require('moment'); +import { timeFormat } from 'constants/strategy'; @observer export class ConnectInformation extends SearchAndFilterContainer { @@ -27,44 +29,70 @@ export class ConnectInformation extends SearchAndFilterContainer { title: '客户端类型', dataIndex: 'clientType', key: 'clientType', - width: '20%', + width: 130, filters: [{ text: '消费', value: 'consumer' }, { text: '生产', value: 'produce' }], onFilter: (value: string, record: IConnectionInfo) => record.clientType.indexOf(value) === 0, render: (t: string) => {t === 'consumer' ? '消费' : '生产'}, }, this.renderColumnsFilter('filterVisible')); - const columns = [{ - title: 'AppID', - dataIndex: 'appId', - key: 'appId', - width: '20%', - sorter: (a: IConnectionInfo, b: IConnectionInfo) => a.appId.charCodeAt(0) - b.appId.charCodeAt(0), - }, - { - title: '主机名', - dataIndex: 'hostname', - key: 'hostname', - width: '40%', - onCell: () => ({ - style: { - maxWidth: 250, - ...cellStyle, - }, - }), - render: (t: string) => { - return ( - {t} - ); + const columns = [ + { + title: 'AppID', + dataIndex: 'appId', + key: 'appId', + width: '30%', + sorter: (a: IConnectionInfo, b: IConnectionInfo) => a.appId.charCodeAt(0) - b.appId.charCodeAt(0), + }, + { + title: 'clientID', + dataIndex: 'clientId', + key: 'clientId', + width: '30%', + onCell: () => ({ + style: { + maxWidth: 250, + ...cellStyle, + }, + }), + render: (t: string) => { + return ( + {t} + ); + }, + }, + { + title: '主机名', + dataIndex: 'hostname', + key: 'hostname', + width: '30%', + onCell: () => ({ + style: { + maxWidth: 250, + ...cellStyle, + }, + }), + render: (t: string) => { + return ( + {t} + ); + }, + }, + { + title: '客户端版本', + dataIndex: 'clientVersion', + key: 'clientVersion', + width: 130, }, - }, - { - title: '客户端版本', - dataIndex: 'clientVersion', - key: 'clientVersion', - width: '20%', - }, clientType, + { + title: '最后访问时间', + dataIndex: 'realConnectTime', + key: 'realConnectTime', + width: 170, + render: (t: number) => moment(t).format(timeFormat), + sorter: (a: IConnectionInfo, b: IConnectionInfo) => a.realConnectTime - b.realConnectTime, + }, ]; if (connectInfo) { return ( diff --git a/kafka-manager-console/src/store/topic.ts b/kafka-manager-console/src/store/topic.ts index cacb7bf4..0cfc1c2b 100644 --- a/kafka-manager-console/src/store/topic.ts +++ b/kafka-manager-console/src/store/topic.ts @@ -75,6 +75,8 @@ export interface IConnectionInfo { hostname: string; ip: string; topicName: string; + clientId?: string; + realConnectTime?: number; key?: number; } diff --git a/kafka-manager-console/webpack.config.js b/kafka-manager-console/webpack.config.js index 1608de20..6401e6c4 100644 --- a/kafka-manager-console/webpack.config.js +++ b/kafka-manager-console/webpack.config.js @@ -130,7 +130,11 @@ module.exports = { historyApiFallback: true, proxy: { '/api/v1/': { - target: 'http://127.0.0.1:8080/', + // target: 'http://117.51.150.133:8080/', + target: 'http://10.190.55.249:8080/', + // target: 'http://10.179.37.199:8008', + // target: 'http://10.179.148.210:8080', + // target: 'http://99.11.45.164:8888', changeOrigin: true, } }, diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java index c1a480a5..8c2e38b6 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java @@ -4,13 +4,15 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO; import java.util.List; +import java.util.Set; /** * Ha App管理 */ public interface HaAppManager { - Result> appRelateTopics(Long clusterPhyId, List filterTopicNameList); + Result> appRelateTopics(Boolean ha, Long clusterPhyId, List filterTopicNameList); + Result> appAndClientRelateTopics(Long clusterPhyId, Set filterTopicNameSet); boolean isContainAllRelateAppTopics(Long clusterPhyId, List filterTopicNameList); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java index b9755e55..60767c61 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java @@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.biz.ha; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult; import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic; +import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO; @@ -37,6 +38,7 @@ public interface HaTopicManager { Result switchHaWithCanRetry(Long newActiveClusterPhyId, Long newStandbyClusterPhyId, List switchTopicNameList, + List kafkaUserAndClientIdList, boolean focus, boolean firstTriggerExecute, JobLogDO switchLogTemplate, diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java index 19ffc5ae..06324f4d 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java @@ -1,15 +1,25 @@ package com.xiaojukeji.kafka.manager.service.biz.ha.impl; import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum; +import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant; +import com.xiaojukeji.kafka.manager.common.constant.Constant; import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO; import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO; +import com.xiaojukeji.kafka.manager.common.utils.FutureUtil; import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager; +import com.xiaojukeji.kafka.manager.service.service.ConfigService; +import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; +import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService; import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @@ -22,17 +32,45 @@ public class HaAppManagerImpl implements HaAppManager { @Autowired private HaASRelationService haASRelationService; + @Autowired + private TopicConnectionService topicConnectionService; + + @Autowired + private ConfigService configService; + + @Autowired + private TopicManagerService topicManagerService; + + private static final FutureUtil>> ConnectionsSearchTP = FutureUtil.init( + "ConnectionsSearchTP", + 5, + 5, + 500 + ); + @Override - public Result> appRelateTopics(Long clusterPhyId, List filterTopicNameList) { + public Result> appRelateTopics(Boolean ha, Long clusterPhyId, List filterTopicNameList) { // 获取关联的Topic列表 Map> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList); + Map> appClientSetMap = haASRelationService.listAllHAClient(clusterPhyId, userTopicMap.keySet()); + // 获取集群已建立HA的Topic列表 Set haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC) .stream() .map(elem -> elem.getActiveResName()) .collect(Collectors.toSet()); + Set topicNameSet = null; + if (ha) { + topicNameSet = haTopicNameSet; + }else { + List topicDOS = topicManagerService.getByClusterId(clusterPhyId); + topicNameSet = topicDOS.stream() + .filter(topicBizPO -> !haTopicNameSet.contains(topicBizPO.getTopicName())) + .map(TopicDO::getTopicName).collect(Collectors.toSet()); + } + Set filterTopicNameSet = new HashSet<>(filterTopicNameList); List voList = new ArrayList<>(); @@ -40,16 +78,18 @@ public class HaAppManagerImpl implements HaAppManager { AppRelateTopicsVO vo = new AppRelateTopicsVO(); vo.setClusterPhyId(clusterPhyId); vo.setKafkaUser(entry.getKey()); + vo.setHaClientIdList(new ArrayList<>(appClientSetMap.getOrDefault(entry.getKey(), new HashSet<>()))); vo.setSelectedTopicNameList(new ArrayList<>()); vo.setNotSelectTopicNameList(new ArrayList<>()); vo.setNotHaTopicNameList(new ArrayList<>()); + Set finalTopicNameSet = topicNameSet; entry.getValue().forEach(elem -> { if (elem.startsWith("__")) { // ignore return; } - if (!haTopicNameSet.contains(elem)) { + if (!finalTopicNameSet.contains(elem)) { vo.getNotHaTopicNameList().add(elem); } else if (filterTopicNameSet.contains(elem)) { vo.getSelectedTopicNameList().add(elem); @@ -64,6 +104,104 @@ public class HaAppManagerImpl implements HaAppManager { return Result.buildSuc(voList); } + @Override + public Result> appAndClientRelateTopics(Long clusterPhyId, Set filterTopicNameSet) { + List haASRelationDOList = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.CLUSTER); + Long secondClusterId = null; + for (HaASRelationDO asRelationDO: haASRelationDOList) { + if (clusterPhyId.equals(asRelationDO.getActiveClusterPhyId())) { + secondClusterId = asRelationDO.getStandbyClusterPhyId(); + } else { + secondClusterId = asRelationDO.getActiveClusterPhyId(); + } + + break; + } + + Map/*ClientID*/>>> connectionsResultMap = new ConcurrentHashMap<>(); + + // 生效时间 + Long activeMin = configService.getLongValue(ConfigConstant.HA_CONNECTION_ACTIVE_TIME_UNIT_MIN, Constant.TOPIC_CONNECTION_LATEST_TIME_MS / 1000 / 60); + + // 获取Topic关联的连接 + for (String topicName: filterTopicNameSet) { + Long tempSecondClusterId = secondClusterId; + ConnectionsSearchTP.runnableTask( + String.format("clusterPhyId=%d||topicName=%s", clusterPhyId, topicName), + 10000, + () -> { + Result>> userAndClientMapResult = topicConnectionService.getHaKafkaUserAndClientIdByTopicName( + clusterPhyId, + tempSecondClusterId, + topicName, + new Date(System.currentTimeMillis() - activeMin * 60L * 1000L), + new Date() + ); + + connectionsResultMap.put(topicName, userAndClientMapResult); + } + ); + + ConnectionsSearchTP.waitExecute(10000); + } + + // 因为接口比较重要,只要一出现异常,则直接返回错误 + for (Result>> valueResult: connectionsResultMap.values()) { + if (valueResult.failed()) { + return Result.buildFromIgnoreData(valueResult); + } + } + + // 查询结果转Map + Map/*ClientID*/> kafkaUserAndClientMap = new HashMap<>(); + for (Result>> valueResult: connectionsResultMap.values()) { + for (Map.Entry> entry: valueResult.getData().entrySet()) { + kafkaUserAndClientMap.putIfAbsent(entry.getKey(), new HashSet<>()); + kafkaUserAndClientMap.get(entry.getKey()).addAll(entry.getValue()); + } + } + + // 获取集群已建立HA的Topic列表 + Set haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC) + .stream() + .map(elem -> elem.getActiveResName()) + .collect(Collectors.toSet()); + + // 获取KafkaUser+Client下的Topic列表 + List voList = new ArrayList<>(); + for (Map.Entry> entry: kafkaUserAndClientMap.entrySet()) { + Long tempSecondClusterId = secondClusterId; + ConnectionsSearchTP.runnableTask( + "", + 10000, + () -> { + Result> doListResult = topicConnectionService.getByClusterAndAppId( + clusterPhyId, + tempSecondClusterId, + entry.getKey(), + new Date(System.currentTimeMillis() - activeMin * 60L * 1000L), + new Date() + ); + if (doListResult.failed()) { + return Result.buildFromIgnoreData(doListResult); + } + + return Result.buildSuc(convert2VOList(clusterPhyId, entry.getValue(), doListResult.getData(), haTopicNameSet, filterTopicNameSet)); + } + ); + + for (Result> elem: ConnectionsSearchTP.waitResult(10000)) { + if (elem.failed()) { + Result.buildFromIgnoreData(elem); + } + + voList.addAll(elem.getData()); + } + } + + return Result.buildSuc(voList); + } + @Override public boolean isContainAllRelateAppTopics(Long clusterPhyId, List filterTopicNameList) { Map> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList); @@ -91,4 +229,41 @@ public class HaAppManagerImpl implements HaAppManager { return userTopicMap; } + + private List convert2VOList(Long clusterPhyId, + Set clientIdSet, + List connectionList, + Set haTopicNameSet, + Set filterTopicNameSet) { + Map voMap = new HashMap<>(); + for (TopicConnectionDO connection: connectionList) { + if (connection.getTopicName().startsWith("__")) { + // 忽略系统内部Topic + continue; + } + + if (!clientIdSet.contains("") && !clientIdSet.contains(connection.getClientId())) { + continue; + } + + AppRelateTopicsVO vo = voMap.get(connection.getClientId()); + if (vo == null) { + vo = new AppRelateTopicsVO(clusterPhyId, connection.getAppId(), connection.getClientId()); + } + + if (!haTopicNameSet.contains(connection.getTopicName())) { + vo.addNotHaIfNotExist(connection.getTopicName()); + } + + if (!filterTopicNameSet.contains(connection.getTopicName())) { + vo.addNotSelectedIfNotExist(connection.getTopicName()); + } else { + vo.addSelectedIfNotExist(connection.getTopicName()); + } + + voMap.put(connection.getClientId(), vo); + } + + return new ArrayList<>(voMap.values()); + } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java index d1224a4b..b57ab928 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java @@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult; import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic; +import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO; import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; @@ -14,13 +15,14 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO; import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils; import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil; +import com.xiaojukeji.kafka.manager.common.utils.HAUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.service.AdminService; import com.xiaojukeji.kafka.manager.service.service.ClusterService; import com.xiaojukeji.kafka.manager.service.service.JobLogService; import com.xiaojukeji.kafka.manager.service.service.TopicManagerService; -import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService; import com.xiaojukeji.kafka.manager.service.service.ha.HaKafkaUserService; import com.xiaojukeji.kafka.manager.service.service.ha.HaTopicService; @@ -28,6 +30,7 @@ import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Component; import java.util.*; @@ -40,9 +43,6 @@ public class HaTopicManagerImpl implements HaTopicManager { @Autowired private ClusterService clusterService; - @Autowired - private AuthorityService authorityService; - @Autowired private HaTopicService haTopicService; @@ -61,10 +61,14 @@ public class HaTopicManagerImpl implements HaTopicManager { @Autowired private JobLogService jobLogService; + @Autowired + private AdminService adminService; + @Override public Result switchHaWithCanRetry(Long newActiveClusterPhyId, Long newStandbyClusterPhyId, List switchTopicNameList, + List kafkaUserAndClientIdList, boolean focus, boolean firstTriggerExecute, JobLogDO switchLogTemplate, @@ -106,7 +110,7 @@ public class HaTopicManagerImpl implements HaTopicManager { } // 4、进行切换预处理 - HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, focus, switchLogTemplate); + HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, kafkaUserAndClientIdList, focus, switchLogTemplate); // 5、直接等待10秒,使得相关数据有机会同步完成 BackoffUtils.backoff(10000); @@ -125,7 +129,15 @@ public class HaTopicManagerImpl implements HaTopicManager { switchTopic.addHaSwitchTopic(this.newStandbyTopicAddFetchConfig(newActiveClusterPhyDO, newStandbyClusterPhyDO, doList, focus, switchLogTemplate, operator)); // 9、进行切换收尾 - switchTopic.addHaSwitchTopic(this.closeoutSwitching(newActiveClusterPhyDO, newStandbyClusterPhyDO, configUtils.getDKafkaGatewayZK(), doList, focus, switchLogTemplate)); + switchTopic.addHaSwitchTopic(this.closeoutSwitching( + newActiveClusterPhyDO, + newStandbyClusterPhyDO, + configUtils.getDKafkaGatewayZK(), + doList, + kafkaUserAndClientIdList, + focus, + switchLogTemplate + )); // 10、状态结果汇总记录 doList.forEach(elem -> switchTopic.addActiveTopicStatus(elem.getActiveResName(), elem.getStatus())); @@ -136,6 +148,18 @@ public class HaTopicManagerImpl implements HaTopicManager { newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(switchTopicNameList), switchTopic, operator ); + if (switchTopic.isFinished()) { + // 全都切换完成,则更新HA信息 + try { + updateHAClient(newActiveClusterPhyId, newStandbyClusterPhyId, kafkaUserAndClientIdList); + } catch (Exception e) { + LOGGER.error( + "method=switchHaWithCanRetry||newActiveClusterPhyId={}||newStandbyClusterPhyId={}||kafkaUserAndClientIdList={}||operator={}||errMsg=exception", + newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(kafkaUserAndClientIdList), operator, e + ); + } + } + return Result.buildSuc(switchTopic); } @@ -188,6 +212,20 @@ public class HaTopicManagerImpl implements HaTopicManager { } Result rv = haTopicService.deleteHA(relationDO.getActiveClusterPhyId(), relationDO.getStandbyClusterPhyId(), topicName, operator); + + //删除备topic资源 + if (dto.getRetainStandbyResource() != null && !dto.getRetainStandbyResource()) { + ResultStatus statusEnum = adminService.deleteTopic( + PhysicalClusterMetadataManager.getClusterFromCache(dto.getStandbyClusterId()), + topicName, + operator); + if (statusEnum.getCode() != ResultStatus.SUCCESS.getCode()){ + LOGGER.error( + "method=batchRemoveHaTopic||activeClusterPhyId={}||standbyClusterPhyId={}||topicName={}||result={}||msg=delete standby topic failed.", + dto.getActiveClusterId(), dto.getStandbyClusterId(), topicName, statusEnum + ); + } + } operationResultList.add(TopicOperationResult.buildFrom(dto.getActiveClusterId(), topicName, rv)); } @@ -200,58 +238,43 @@ public class HaTopicManagerImpl implements HaTopicManager { jobLogService.addLogAndIgnoreException(switchLogTemplate.setAndCopyNew(new Date(), content)); } - /** - * 切换预处理 - * 1、在主集群上,将Topic关联的KafkaUser的active集群设置为None - */ - private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO, List doList, boolean focus, JobLogDO switchLogTemplate) { - // 暂停HA的KafkaUser - Set stoppedHaKafkaUserSet = new HashSet<>(); - + private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO, + List doList, + List clientDTOList, + boolean focus, + JobLogDO switchLogTemplate) { HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true); boolean allSuccess = true; // 所有都成功 - boolean needLog = false; // 需要记录日志 - for (HaASRelationDO relationDO: doList) { - if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)) { - // 当前不处于prepare状态 - haSwitchTopic.setFinished(true); - continue; - } - needLog = true; - // 获取关联的KafkaUser - Set relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName()) - .stream() - .map(elem -> elem.getAppId()) - .filter(kafkaUser -> !stoppedHaKafkaUserSet.contains(kafkaUser)) - .collect(Collectors.toSet()); - - // 暂停kafkaUser HA - for (String kafkaUser: relatedKafkaUserSet) { - Result rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), kafkaUser); - if (rv.failed() && !focus) { - haSwitchTopic.setFinished(false); - - this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()))); - return haSwitchTopic; - } else if (rv.failed() && focus) { - allSuccess = false; - } - } - - // 记录操作过的user - stoppedHaKafkaUserSet.addAll(relatedKafkaUserSet); - - // 修改Topic主备状态 - relationDO.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE); - haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE); + // 存在prepare状态的,则就需要进行预处理操作 + boolean needDOIt = doList.stream().filter(elem -> elem.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)).count() > 0; + if (!needDOIt) { + // 不需要做 + return haSwitchTopic; } - if (needLog) { - this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作")); + // 暂停kafkaUser HA + for (KafkaUserAndClientDTO dto: clientDTOList) { + Result rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId())); + if (rv.failed() && !focus) { + haSwitchTopic.setFinished(false); + + this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()))); + return haSwitchTopic; + } else if (rv.failed() && focus) { + allSuccess = false; + } } + // 修改Topic主备状态 + doList.forEach(elem -> { + elem.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE); + haASRelationService.updateRelationStatus(elem.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE); + }); + + this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作")); + haSwitchTopic.setFinished(true); return haSwitchTopic; } @@ -412,87 +435,76 @@ public class HaTopicManagerImpl implements HaTopicManager { * 2、原先的备集群-修改user的active集群,指向新的主集群 * 3、网关-修改user的active集群,指向新的主集群 */ - private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO, ClusterDO newStandbyClusterPhyDO, String gatewayZK, List doList, boolean focus, JobLogDO switchLogTemplate) { - // 暂停HA的KafkaUser - Set activeHaKafkaUserSet = new HashSet<>(); + private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO, + ClusterDO newStandbyClusterPhyDO, + String gatewayZK, + List doList, + List kafkaUserAndClientDTOList, + boolean focus, + JobLogDO switchLogTemplate) { + HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true); + + boolean needDOIt = doList.stream().filter(elem -> elem.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)).count() > 0; + if (!needDOIt) { + // 不需要做任何事情 + return haSwitchTopic; + } boolean allSuccess = true; - boolean needLog = false; boolean forceAndNewStandbyFailed = false; // 强制切换,但是新的备依旧操作失败 - HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true); - for (HaASRelationDO relationDO: doList) { - if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)) { - // 当前不处于closeout状态 + for (KafkaUserAndClientDTO dto: kafkaUserAndClientDTOList) { + String zkNodeName = HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId()); + + // 操作新的主集群 + Result rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), zkNodeName); + if (rv.failed() && !focus) { haSwitchTopic.setFinished(false); - continue; + this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()))); + return haSwitchTopic; + } else if (rv.failed() && focus) { + allSuccess = false; } - needLog = true; - - // 获取关联的KafkaUser - Set relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName()) - .stream() - .map(elem -> elem.getAppId()) - .filter(kafkaUser -> !activeHaKafkaUserSet.contains(kafkaUser)) - .collect(Collectors.toSet()); - - for (String kafkaUser: relatedKafkaUserSet) { - // 操作新的主集群 - Result rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser); - if (rv.failed() && !focus) { - haSwitchTopic.setFinished(false); - this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()))); - return haSwitchTopic; - } else if (rv.failed() && focus) { - allSuccess = false; - } - - // 操作新的备集群,如果出现错误,则下次就不再进行操作ZK。新的备的Topic不是那么重要,因此这里允许出现跳过 - rv = null; - if (!forceAndNewStandbyFailed) { - // 如果对备集群的操作过程中,出现了失败,则直接跳过 - rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser); - } - - if (rv != null && rv.failed() && !focus) { - haSwitchTopic.setFinished(false); - this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()))); - return haSwitchTopic; - } else if (rv != null && rv.failed() && focus) { - allSuccess = false; - forceAndNewStandbyFailed = true; - } - - // 操作网关 - rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), kafkaUser); - if (rv.failed() && !focus) { - haSwitchTopic.setFinished(false); - this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()))); - return haSwitchTopic; - } else if (rv.failed() && focus) { - allSuccess = false; - } + // 操作新的备集群,如果出现错误,则下次就不再进行操作ZK。新的备的Topic不是那么重要,因此这里允许出现跳过 + rv = null; + if (!forceAndNewStandbyFailed) { + // 如果对备集群的操作过程中,出现了失败,则直接跳过 + rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), zkNodeName); } - // 记录已经激活的User - activeHaKafkaUserSet.addAll(relatedKafkaUserSet); + if (rv != null && rv.failed() && !focus) { + haSwitchTopic.setFinished(false); + this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()))); + return haSwitchTopic; + } else if (rv != null && rv.failed() && focus) { + allSuccess = false; + forceAndNewStandbyFailed = true; + } - // 修改Topic主备信息 + // 操作网关 + rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), zkNodeName); + if (rv.failed() && !focus) { + haSwitchTopic.setFinished(false); + this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()))); + return haSwitchTopic; + } else if (rv.failed() && focus) { + allSuccess = false; + } + } + + // 修改Topic主备信息 + doList.forEach(elem -> { HaASRelationDO newHaASRelationDO = new HaASRelationDO( - newActiveClusterPhyDO.getId(), relationDO.getActiveResName(), - newStandbyClusterPhyDO.getId(), relationDO.getStandbyResName(), + newActiveClusterPhyDO.getId(), elem.getActiveResName(), + newStandbyClusterPhyDO.getId(), elem.getStandbyResName(), HaResTypeEnum.TOPIC.getCode(), HaStatusEnum.STABLE_CODE ); - newHaASRelationDO.setId(relationDO.getId()); + newHaASRelationDO.setId(elem.getId()); haASRelationService.updateById(newHaASRelationDO); - } - - if (!needLog) { - return haSwitchTopic; - } + }); this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作")); return haSwitchTopic; @@ -556,4 +568,45 @@ public class HaTopicManagerImpl implements HaTopicManager { return Result.buildSuc(relationDO); } + + private void updateHAClient(Long newActiveClusterPhyId, + Long newStandbyClusterPhyId, + List kafkaUserAndClientIdList) { + if (ValidateUtils.isEmptyList(kafkaUserAndClientIdList)) { + return; + } + + List doList = haASRelationService.listAllHAFromDB(newActiveClusterPhyId, HaResTypeEnum.KAFKA_USER_AND_CLIENT); + + Map resNameMap = new HashMap<>(); + doList.forEach(elem -> resNameMap.put(elem.getActiveResName(), elem)); + + for (KafkaUserAndClientDTO dto: kafkaUserAndClientIdList) { + if (ValidateUtils.isBlank(dto.getClientId())) { + continue; + } + String resName = HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId()); + + HaASRelationDO newDO = new HaASRelationDO( + newActiveClusterPhyId, + resName, + newStandbyClusterPhyId, + resName, + HaResTypeEnum.KAFKA_USER_AND_CLIENT.getCode(), + HaStatusEnum.STABLE_CODE + ); + + HaASRelationDO oldDO = resNameMap.remove(resName); + if (oldDO != null) { + newDO.setId(oldDO.getId()); + haASRelationService.updateById(newDO); + } else { + try { + haASRelationService.addHAToDB(newDO); + } catch (DuplicateKeyException dke) { + // ignore + } + } + } + } } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/job/impl/HaASSwitchJobManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/job/impl/HaASSwitchJobManagerImpl.java index 86f68c3f..3ad8aea4 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/job/impl/HaASSwitchJobManagerImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/job/impl/HaASSwitchJobManagerImpl.java @@ -22,10 +22,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO; import com.xiaojukeji.kafka.manager.common.entity.vo.ha.job.HaJobDetailVO; -import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils; -import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil; -import com.xiaojukeji.kafka.manager.common.utils.FutureUtil; -import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +import com.xiaojukeji.kafka.manager.common.utils.*; import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager; import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager; import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager; @@ -95,19 +92,20 @@ public class HaASSwitchJobManagerImpl implements HaASSwitchJobManager { LOGGER.info("method=createJob||activeClusterPhyId={}||switchTopics={}||operator={}", dto.getActiveClusterPhyId(), ConvertUtil.obj2Json(haTopicSetResult.getData()), operator); - // 2、查看是否将KafkaUser关联的Topic都涵盖了 - if (dto.getMustContainAllKafkaUserTopics() != null - && dto.getMustContainAllKafkaUserTopics() - && (dto.getAll() == null || !dto.getAll()) - && !haAppManager.isContainAllRelateAppTopics(dto.getActiveClusterPhyId(), dto.getTopicNameList())) { - return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "存在KafkaUser关联的Topic未选中"); - } +// // 2、查看是否将KafkaUser关联的Topic都涵盖了 +// if (dto.getMustContainAllKafkaUserTopics() != null +// && dto.getMustContainAllKafkaUserTopics() +// && (dto.getAll() == null || !dto.getAll()) +// && !haAppManager.isContainAllRelateAppTopics(dto.getActiveClusterPhyId(), dto.getTopicNameList())) { +// return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "存在KafkaUser关联的Topic未选中"); +// } // 3、创建任务 Result longResult = haASSwitchJobService.createJob( dto.getActiveClusterPhyId(), dto.getStandbyClusterPhyId(), new ArrayList<>(haTopicSetResult.getData()), + dto.getKafkaUserAndClientIdList(), operator ); if (longResult.failed()) { @@ -176,6 +174,7 @@ public class HaASSwitchJobManagerImpl implements HaASSwitchJobManager { jobDO.getActiveClusterPhyId(), jobDO.getStandbyClusterPhyId(), subJobDOList.stream().map(elem -> elem.getActiveResName()).collect(Collectors.toList()), + jobDO.getExtendRawData(), focus, firstTriggerExecute, new JobLogDO(JobLogBizTypEnum.HA_SWITCH_JOB_LOG.getCode(), String.valueOf(jobId)), diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java index f73ff8d5..891d3835 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionService.java @@ -1,10 +1,13 @@ package com.xiaojukeji.kafka.manager.service.service.gateway; +import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import java.util.Date; import java.util.List; +import java.util.Map; +import java.util.Set; /** * @author zhongyuankai @@ -21,6 +24,14 @@ public interface TopicConnectionService { Date startTime, Date endTime); + Result/*ClientID*/>> getHaKafkaUserAndClientIdByTopicName(Long firstClusterId, + Long secondClusterId, + String topicName, + Date startTime, + Date endTime); + + Set getKafkaUserAndClientIdTopicNames(Set clusterIdSet, String kafkaUser, String clientId, Date startTime, Date endTime); + /** * 查询连接信息 */ @@ -37,6 +48,8 @@ public interface TopicConnectionService { Date startTime, Date endTime); + Result> getByClusterAndAppId(Long firstClusterId, Long secondClusterId, String appId, Date startTime, Date endTime); + /** * 判断topic是否存在连接 */ diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java index 4c9a5528..54002df2 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/gateway/impl/TopicConnectionServiceImpl.java @@ -1,6 +1,8 @@ package com.xiaojukeji.kafka.manager.service.service.gateway.impl; import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant; @@ -67,6 +69,71 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { return getByTopicName(clusterId, doList); } + @Override + public Result>> getHaKafkaUserAndClientIdByTopicName(Long firstClusterId, + Long secondClusterId, + String topicName, + Date startTime, + Date endTime) { + List doList = new ArrayList<>(); + try { + if (firstClusterId != null) { + doList.addAll(topicConnectionDao.getByTopicName(firstClusterId, topicName, startTime, endTime)); + } + } catch (Exception e) { + LOGGER.error("get topic connections failed, firstClusterId:{} topicName:{}.", firstClusterId, topicName, e); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage()); + } + + try { + if (secondClusterId != null) { + doList.addAll(topicConnectionDao.getByTopicName(secondClusterId, topicName, startTime, endTime)); + } + } catch (Exception e) { + LOGGER.error("get topic connections failed, secondClusterId:{} topicName:{}.", secondClusterId, topicName, e); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage()); + } + + if (ValidateUtils.isEmptyList(doList)) { + return Result.buildSuc(new HashMap<>()); + } + + Map> userAndClientMap = new HashMap<>(); + for (TopicConnectionDO po: doList) { + if (!po.getClientId().startsWith("P#") && !po.getClientId().startsWith("C#")) { + // 忽略非HA的clientId + continue; + } + + userAndClientMap.putIfAbsent(po.getAppId(), new HashSet<>()); + userAndClientMap.get(po.getAppId()).add(po.getClientId()); + } + + return Result.buildSuc(userAndClientMap); + } + + @Override + public Set getKafkaUserAndClientIdTopicNames(Set clusterIdSet, String kafkaUser, String clientId, Date startTime, Date endTime) { + List doList = null; + try { + doList = topicConnectionDao.getByAppId(kafkaUser, startTime, endTime); + } catch (Exception e) { + LOGGER.error("get topic connections failed, kafkaUser:{}.", kafkaUser, e); + } + + if (ValidateUtils.isEmptyList(doList)) { + return new HashSet<>(); + } + + return doList + .stream() + .filter(elem -> elem.getClientId().equals(clientId) && clusterIdSet.contains(elem.getClusterId())) + .map(item -> item.getTopicName()) + .collect(Collectors.toSet()); + } + @Override public List getByTopicName(Long clusterId, String topicName, @@ -102,6 +169,36 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { return getByTopicName(null, doList); } + @Override + public Result> getByClusterAndAppId(Long firstClusterId, Long secondClusterId, String appId, Date startTime, Date endTime) { + List doList = new ArrayList<>(); + try { + if (firstClusterId != null) { + doList.addAll(topicConnectionDao.getByClusterAndAppId(firstClusterId, appId, startTime, endTime)); + } + } catch (Exception e) { + LOGGER.error("get topic connections failed, firstClusterId:{} appId:{}.", firstClusterId, appId, e); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage()); + } + + try { + if (secondClusterId != null) { + doList.addAll(topicConnectionDao.getByClusterAndAppId(secondClusterId, appId, startTime, endTime)); + } + } catch (Exception e) { + LOGGER.error("get topic connections failed, secondClusterId:{} appId:{}.", secondClusterId, appId, e); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage()); + } + + if (ValidateUtils.isEmptyList(doList)) { + return Result.buildSuc(new ArrayList<>()); + } + + return Result.buildSuc(doList); + } + @Override public boolean isExistConnection(Long clusterId, String topicName, @@ -210,6 +307,10 @@ public class TopicConnectionServiceImpl implements TopicConnectionService { LOGGER.error("get hostname failed. ip:{}.", connectionDO.getIp(), e); } dto.setHostname(hostName.replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, "")); + + dto.setClientId(connectionDO.getClientId()); + dto.setRealConnectTime(connectionDO.getRealConnectTime()); + dto.setCreateTime(connectionDO.getCreateTime().getTime()); return dto; } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASRelationService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASRelationService.java index 08445182..748bc972 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASRelationService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASRelationService.java @@ -5,6 +5,8 @@ import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO; import java.util.List; +import java.util.Map; +import java.util.Set; public interface HaASRelationService { Result replaceTopicRelationsToDB(Long standbyClusterPhyId, List topicRelationDOList); @@ -53,6 +55,8 @@ public interface HaASRelationService { */ List listAllHAFromDB(Long firstClusterPhyId, HaResTypeEnum resTypeEnum); + Map> listAllHAClient(Long firstClusterPhyId, Set kafkaUserSet); + /** * 获取主备关系 */ diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASSwitchJobService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASSwitchJobService.java index 189a4ba0..cf0584d5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASSwitchJobService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/HaASSwitchJobService.java @@ -4,6 +4,7 @@ package com.xiaojukeji.kafka.manager.service.service.ha; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaJobDetail; import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaSubJobExtendData; +import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO; @@ -14,7 +15,11 @@ public interface HaASSwitchJobService { /** * 创建任务 */ - Result createJob(Long activeClusterPhyId, Long standbyClusterPhyId, List topicNameList, String operator); + Result createJob(Long activeClusterPhyId, + Long standbyClusterPhyId, + List topicNameList, + List kafkaUserAndClientList, + String operator); /** * 更新任务状态 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASRelationServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASRelationServiceImpl.java index 097e864e..40ec2d51 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASRelationServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASRelationServiceImpl.java @@ -6,6 +6,8 @@ import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaStatusEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO; +import com.xiaojukeji.kafka.manager.common.utils.HAUtils; +import com.xiaojukeji.kafka.manager.common.utils.Tuple; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.dao.ha.HaASRelationDao; import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService; @@ -14,9 +16,7 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.function.Function; import java.util.stream.Collectors; @@ -177,6 +177,34 @@ public class HaASRelationServiceImpl implements HaASRelationService { return doList; } + @Override + public Map> listAllHAClient(Long firstClusterPhyId, Set kafkaUserSet) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(HaASRelationDO::getResType, HaResTypeEnum.KAFKA_USER_AND_CLIENT.getCode()); + lambdaQueryWrapper.and(lambda -> + lambda.eq(HaASRelationDO::getActiveClusterPhyId, firstClusterPhyId).or().eq(HaASRelationDO::getStandbyClusterPhyId, firstClusterPhyId) + ); + + // 查询HA列表 + List doList = haASRelationDao.selectList(lambdaQueryWrapper); + if (ValidateUtils.isNull(doList)) { + return new HashMap<>(); + } + + Map> haClientMap = new HashMap<>(); + doList.forEach(elem -> { + Tuple data = HAUtils.splitKafkaUserAndClient(elem.getActiveResName()); + if (data == null || !kafkaUserSet.contains(data.getV1())) { + return; + } + + haClientMap.putIfAbsent(data.getV1(), new HashSet<>()); + haClientMap.get(data.getV1()).add(data.getV2()); + }); + + return haClientMap; + } + @Override public List listAllHAFromDB(Long firstClusterPhyId, Long secondClusterPhyId, HaResTypeEnum resTypeEnum) { // 查询HA列表 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASSwitchJobServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASSwitchJobServiceImpl.java index 408fcff7..9a906690 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASSwitchJobServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ha/impl/HaASSwitchJobServiceImpl.java @@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.common.bizenum.ha.job.HaJobStatusEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.*; +import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO; import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO; import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil; @@ -35,10 +36,22 @@ public class HaASSwitchJobServiceImpl implements HaASSwitchJobService { @Override @Transactional - public Result createJob(Long activeClusterPhyId, Long standbyClusterPhyId, List topicNameList, String operator) { + public Result createJob(Long activeClusterPhyId, + Long standbyClusterPhyId, + List topicNameList, + List kafkaUserAndClientList, + String operator) { try { // 父任务 - HaASSwitchJobDO jobDO = new HaASSwitchJobDO(activeClusterPhyId, standbyClusterPhyId, HaJobStatusEnum.RUNNING.getStatus(), operator); + HaASSwitchJobDO jobDO = new HaASSwitchJobDO( + activeClusterPhyId, + standbyClusterPhyId, + ValidateUtils.isEmptyList(kafkaUserAndClientList)? 0: 1, + kafkaUserAndClientList, + HaJobStatusEnum.RUNNING.getStatus(), + operator + ); + haASSwitchJobDao.insert(jobDO); // 子任务 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/HaKafkaUserCommands.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/HaKafkaUserCommands.java index eeb43d87..3819b919 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/HaKafkaUserCommands.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/HaKafkaUserCommands.java @@ -6,10 +6,15 @@ import kafka.admin.AdminUtils; import kafka.admin.AdminUtils$; import kafka.server.ConfigType; import kafka.utils.ZkUtils; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.security.JaasUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.UnsupportedEncodingException; +import java.net.URLDecoder; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Properties; @@ -40,7 +45,7 @@ public class HaKafkaUserCommands { props.putAll(modifiedProps); // 修改配置, 这里不使用changeUserOrUserClientIdConfig方法的原因是changeUserOrUserClientIdConfig这个方法会进行参数检查 - AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), kafkaUser, props); + AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), sanitize(kafkaUser), props); } catch (Exception e) { LOGGER.error("method=changeHaUserConfig||zookeeper={}||kafkaUser={}||modifiedProps={}||errMsg=exception", zookeeper, kafkaUser, modifiedProps, e); return false; @@ -73,7 +78,7 @@ public class HaKafkaUserCommands { } // 修改配置, 这里不使用changeUserOrUserClientIdConfig方法的原因是changeUserOrUserClientIdConfig这个方法会进行参数检查 - AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), kafkaUser, presentProps); + AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), sanitize(kafkaUser), presentProps); return true; }catch (Exception e){ @@ -90,4 +95,37 @@ public class HaKafkaUserCommands { private HaKafkaUserCommands() { } + + private static String sanitize(String name) { + String encoded = ""; + try { + encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name()); + StringBuilder builder = new StringBuilder(); + for (int i = 0; i < encoded.length(); i++) { + char c = encoded.charAt(i); + if (c == '*') { // Metric ObjectName treats * as pattern + builder.append("%2A"); + } else if (c == '+') { // Space URL-encoded as +, replace with percent encoding + builder.append("%20"); + } else { + builder.append(c); + } + } + return builder.toString(); + } catch (UnsupportedEncodingException e) { + throw new KafkaException(e); + } + } + + /** + * Desanitize name that was URL-encoded using {@link #sanitize(String)}. This + * is used to obtain the desanitized version of node names in ZooKeeper. + */ + private static String desanitize(String name) { + try { + return URLDecoder.decode(name, StandardCharsets.UTF_8.name()); + } catch (UnsupportedEncodingException e) { + throw new KafkaException(e); + } + } } diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java index 189caf90..55e9881d 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/TopicConnectionDao.java @@ -16,4 +16,6 @@ public interface TopicConnectionDao { List getByTopicName(Long clusterId, String topicName, Date startTime, Date endTime); List getByAppId(String appId, Date startTime, Date endTime); + + List getByClusterAndAppId(Long clusterId, String appId, Date startTime, Date endTime); } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java index 2c7da78f..d47ede4e 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/gateway/impl/TopicConnectionDaoImpl.java @@ -58,4 +58,14 @@ public class TopicConnectionDaoImpl implements TopicConnectionDao { params.put("endTime", endTime); return sqlSession.selectList("TopicConnectionDao.getByAppId", params); } + + @Override + public List getByClusterAndAppId(Long clusterId, String appId, Date startTime, Date endTime) { + Map params = new HashMap<>(4); + params.put("appId", appId); + params.put("clusterId", clusterId); + params.put("startTime", startTime); + params.put("endTime", endTime); + return sqlSession.selectList("TopicConnectionDao.getByClusterAndAppId", params); + } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/resources/mapper/HaActiveStandbySwitchJobDao.xml b/kafka-manager-dao/src/main/resources/mapper/HaActiveStandbySwitchJobDao.xml index c1128be8..3b016c1b 100644 --- a/kafka-manager-dao/src/main/resources/mapper/HaActiveStandbySwitchJobDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/HaActiveStandbySwitchJobDao.xml @@ -10,6 +10,8 @@ + + @@ -18,9 +20,9 @@ useGeneratedKeys="true" keyProperty="id"> INSERT INTO ks_km_physical_cluster - (active_cluster_phy_id, standby_cluster_phy_id, job_status, operator) + (active_cluster_phy_id, standby_cluster_phy_id, job_status, `type`, extend_data, operator) VALUES - (#{activeClusterPhyId}, #{standbyClusterPhyId}, #{jobStatus}, #{operator}) + (#{activeClusterPhyId}, #{standbyClusterPhyId}, #{jobStatus}, #{type}, #{extendData}, #{operator}) @@ -53,4 +49,14 @@ AND create_time >= #{startTime} AND #{endTime} >= create_time ]]> + + \ No newline at end of file diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/HaFlushASSwitchJob.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/ha/HaFlushASSwitchJob.java similarity index 96% rename from kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/HaFlushASSwitchJob.java rename to kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/ha/HaFlushASSwitchJob.java index d19726c4..c372339d 100644 --- a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/op/HaFlushASSwitchJob.java +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/ha/HaFlushASSwitchJob.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.kafka.manager.task.dispatch.op; +package com.xiaojukeji.kafka.manager.task.dispatch.ha; import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager; import com.xiaojukeji.kafka.manager.service.service.ha.HaASSwitchJobService; diff --git a/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/ha/HandleHaClientNewTopic.java b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/ha/HandleHaClientNewTopic.java new file mode 100644 index 00000000..bc83a0a6 --- /dev/null +++ b/kafka-manager-task/src/main/java/com/xiaojukeji/kafka/manager/task/dispatch/ha/HandleHaClientNewTopic.java @@ -0,0 +1,156 @@ +//package com.xiaojukeji.kafka.manager.task.dispatch.ha; +// +//import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum; +//import com.xiaojukeji.kafka.manager.common.bizenum.ha.job.HaJobStatusEnum; +//import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant; +//import com.xiaojukeji.kafka.manager.common.constant.Constant; +//import com.xiaojukeji.kafka.manager.common.entity.dto.ha.ASSwitchJobDTO; +//import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +//import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO; +//import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO; +//import com.xiaojukeji.kafka.manager.common.utils.HAUtils; +//import com.xiaojukeji.kafka.manager.common.utils.Tuple; +//import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; +//import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager; +//import com.xiaojukeji.kafka.manager.service.service.ClusterService; +//import com.xiaojukeji.kafka.manager.service.service.ConfigService; +//import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService; +//import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService; +//import com.xiaojukeji.kafka.manager.service.service.ha.HaASSwitchJobService; +//import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask; +//import com.xiaojukeji.kafka.manager.task.component.CustomScheduled; +//import org.springframework.beans.factory.annotation.Autowired; +//import org.springframework.stereotype.Component; +// +//import java.util.*; +//import java.util.function.Function; +//import java.util.stream.Collectors; +// +///** +// * 主备切换任务 +// */ +//@Component +//@CustomScheduled(name = "HandleHaClientNewTopic", +// cron = "0 0/2 * * * ?", +// threadNum = 1, +// description = "处理HAClient的新增Topic") +//public class HandleHaClientNewTopic extends AbstractScheduledTask { +// @Autowired +// private ClusterService clusterService; +// +// @Autowired +// private HaASRelationService haASRelationService; +// +// @Autowired +// private TopicConnectionService topicConnectionService; +// +// @Autowired +// private HaASSwitchJobManager haASSwitchJobManager; +// +// @Autowired +// private HaASSwitchJobService haASSwitchJobService; +// +// @Autowired +// private ConfigService configService; +// +// @Override +// public List listAllTasks() { +// return clusterService.list(); +// } +// +// @Override +// public void processTask(ClusterDO clusterDO) { +// if (this.existRunningTask(clusterDO.getId())) { +// // 存在运行中的任务 +// return; +// } +// +// // 获取已经建立HA的Client +// List doList = haASRelationService.listAllHAFromDB(clusterDO.getId(), HaResTypeEnum.KAFKA_USER_AND_CLIENT); +// +// // 获取已经建立HA的Topic +// Map nameMap = haASRelationService.listAllHAFromDB(clusterDO.getId(), HaResTypeEnum.TOPIC) +// .stream() +// .collect(Collectors.toMap(HaASRelationDO::getActiveResName, Function.identity())); +// +// // 新的主备集群 & 需要切换的Topic +// Long newActiveClusterId = null; +// Long newStandbyClusterId = null; +// Map needSwitchTopicMap = new HashMap<>(); +// +// // 查找clientId关联的Topic列表 +// for (HaASRelationDO asRelationDO: doList) { +// if (newActiveClusterId != null && !newActiveClusterId.equals(asRelationDO.getActiveClusterPhyId())) { +// // 一次切换,仅能有一个主集群ID,不能有多个。不一致时,直接忽略 +// continue; +// } +// +// Tuple userAndClient = HAUtils.splitKafkaUserAndClient(asRelationDO.getActiveResName()); +// if (userAndClient == null || ValidateUtils.isBlank(userAndClient.getV2())) { +// continue; +// } +// +// // 获取该client对应的Topic +// Set topicNameSet = topicConnectionService.getKafkaUserAndClientIdTopicNames( +// new HashSet<>(Arrays.asList(asRelationDO.getActiveClusterPhyId(), asRelationDO.getStandbyClusterPhyId())), +// userAndClient.getV1(), +// userAndClient.getV2(), +// new Date(System.currentTimeMillis() - configService.getLongValue(ConfigConstant.HA_CONNECTION_ACTIVE_TIME_UNIT_MIN, 20L) * 60L * 1000L), +// new Date() +// ); +// +// // 遍历Topic,判断主备关系是否符合预期 +// for (String topicName: topicNameSet) { +// HaASRelationDO topicRelation = nameMap.get(topicName); +// if (topicRelation == null +// || asRelationDO.getActiveClusterPhyId().equals(topicRelation.getActiveClusterPhyId())) { +// // Topic为空,未建立高可用,忽略该Topic +// // 已建立HA,且该Topic的主备信息和当前clientId一致,因此也不需要进行主备切换 +// continue; +// } +// +// // 主备信息不一致时,进行主备切换 +// if (needSwitchTopicMap.isEmpty()) { +// newActiveClusterId = asRelationDO.getActiveClusterPhyId(); +// newStandbyClusterId = asRelationDO.getStandbyClusterPhyId(); +// } +// +// needSwitchTopicMap.put(topicName, topicRelation); +// } +// } +// +// if (this.existRunningTask(clusterDO.getId())) { +// // 再次判断是否存在运行中的任务 +// return; +// } +// +// // 创建任务 +// haASSwitchJobManager.createJob( +// this.convert2ASSwitchJobDTO(newActiveClusterId, newStandbyClusterId, new ArrayList<>(needSwitchTopicMap.values())), +// Constant.DEFAULT_USER_NAME +// ); +// } +// +// private ASSwitchJobDTO convert2ASSwitchJobDTO(Long newActiveClusterId, Long newStandbyClusterId, List doList) { +// ASSwitchJobDTO dto = new ASSwitchJobDTO(); +// dto.setAll(false); +// dto.setMustContainAllKafkaUserTopics(false); +// dto.setActiveClusterPhyId(newActiveClusterId); +// dto.setStandbyClusterPhyId(newStandbyClusterId); +// dto.setTopicNameList(doList.stream().map(elem -> elem.getActiveResName()).collect(Collectors.toList())); +// dto.setKafkaUserAndClientIdList(new ArrayList<>()); // clientId 或者 kafkaUser 已切换好,所以后台任务不需要执行该步骤 +// +// return dto; +// } +// +// private boolean existRunningTask(Long clusterPhyId) { +// Map jobMap = haASSwitchJobService.listClusterLatestJobs(); +// +// HaASSwitchJobDO jobDO = jobMap.remove(clusterPhyId); +// if (jobDO == null || !HaJobStatusEnum.isRunning(jobDO.getJobStatus())) { +// return false; +// } +// +// return true; +// } +//} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdAppController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdAppController.java index 288b2eea..7e2c7c3e 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdAppController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdAppController.java @@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.validation.annotation.Validated; import org.springframework.web.bind.annotation.*; +import java.util.HashSet; import java.util.List; /** @@ -52,6 +53,10 @@ public class RdAppController { @PostMapping(value = "apps/relate-topics") @ResponseBody public Result> appRelateTopics(@Validated @RequestBody AppRelateTopicsDTO dto) { - return haAppManager.appRelateTopics(dto.getClusterPhyId(), dto.getFilterTopicNameList()); + if (dto.getUseKafkaUserAndClientId() != null && dto.getUseKafkaUserAndClientId()) { + return haAppManager.appAndClientRelateTopics(dto.getClusterPhyId(), new HashSet<>(dto.getFilterTopicNameList())); + } + + return haAppManager.appRelateTopics(dto.getHa(), dto.getClusterPhyId(), dto.getFilterTopicNameList()); } } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 67662126..05556d51 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ - 2.8.0_e + 2.8.1_e 2.1.18.RELEASE 2.9.2 1.5.21