Compare commits

..

3 Commits

Author SHA1 Message Date
孙超
0f15c773ef build dependencies version lock 2023-02-23 14:43:33 +08:00
fengqiongfeng
02b05fc7c8 HA-添加高可用相关表结构 2023-02-23 11:31:33 +08:00
zengqiao
b16a7b9bff v2.8.1_e初始化
1、测试代码,开源用户尽量不要使用;
2、包含Kafka-HA的相关功能,在v2.8.0_e的基础上,补充按照clientId切换的功能;
3、基于v2.8.0_e拉的分支;
2023-02-13 16:48:59 +08:00
44 changed files with 1759 additions and 610 deletions

View File

@@ -593,6 +593,10 @@ CREATE TABLE `work_order` (
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='工单表';
ALTER TABLE `topic_connections` ADD COLUMN `client_id` VARCHAR(1024) NOT NULL DEFAULT '' COMMENT '客户端ID' AFTER `client_version`;
create table ha_active_standby_relation
(
id bigint unsigned auto_increment comment 'id'

View File

@@ -9,9 +9,13 @@ import lombok.Getter;
@Getter
public enum HaResTypeEnum {
CLUSTER(0, "Cluster"),
TOPIC(1, "Topic"),
KAFKA_USER(2, "KafkaUser"),
KAFKA_USER_AND_CLIENT(3, "KafkaUserAndClient"),
;
private final int code;
@@ -22,4 +26,4 @@ public enum HaResTypeEnum {
this.code = code;
this.msg = msg;
}
}
}

View File

@@ -33,6 +33,8 @@ public class ConfigConstant {
public static final String HA_SWITCH_JOB_TIMEOUT_UNIT_SEC_CONFIG_PREFIX = "HA_SWITCH_JOB_TIMEOUT_UNIT_SEC_CONFIG_CLUSTER";
public static final String HA_CONNECTION_ACTIVE_TIME_UNIT_MIN = "HA_CONNECTION_ACTIVE_TIME_UNIT_MIN";
private ConfigConstant() {
}
}

View File

@@ -1,9 +1,12 @@
package com.xiaojukeji.kafka.manager.common.entity.ao.topic;
import lombok.Data;
/**
* @author zengqiao
* @date 20/4/20
*/
@Data
public class TopicConnection {
private Long clusterId;
@@ -19,72 +22,9 @@ public class TopicConnection {
private String clientVersion;
public Long getClusterId() {
return clusterId;
}
private String clientId;
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
private Long realConnectTime;
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public String getClientType() {
return clientType;
}
public void setClientType(String clientType) {
this.clientType = clientType;
}
public String getClientVersion() {
return clientVersion;
}
public void setClientVersion(String clientVersion) {
this.clientVersion = clientVersion;
}
@Override
public String toString() {
return "TopicConnectionDTO{" +
"clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", appId='" + appId + '\'' +
", ip='" + ip + '\'' +
", hostname='" + hostname + '\'' +
", clientType='" + clientType + '\'' +
", clientVersion='" + clientVersion + '\'' +
'}';
}
private Long createTime;
}

View File

@@ -15,12 +15,4 @@ public class ASSwitchJobActionDTO {
@NotBlank(message = "action不允许为空")
@ApiModelProperty(value = "动作, force")
private String action;
// @NotNull(message = "all不允许为NULL")
// @ApiModelProperty(value = "所有的Topic")
// private Boolean allJumpWaitInSync;
//
// @NotNull(message = "jumpWaitInSyncActiveTopicList不允许为NULL")
// @ApiModelProperty(value = "操作的Topic")
// private List<String> jumpWaitInSyncActiveTopicList;
}

View File

@@ -4,6 +4,7 @@ import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.util.List;
@@ -27,5 +28,13 @@ public class ASSwitchJobDTO {
private Long standbyClusterPhyId;
@NotNull(message = "topicNameList不允许为NULL")
@ApiModelProperty(value="切换的Topic名称列表")
private List<String> topicNameList;
/**
* kafkaUser+Client列表
*/
@Valid
@ApiModelProperty(value="切换的KafkaUser&ClientId列表Client可以为空串")
private List<KafkaUserAndClientDTO> kafkaUserAndClientIdList;
}

View File

@@ -0,0 +1,18 @@
package com.xiaojukeji.kafka.manager.common.entity.dto.ha;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import javax.validation.constraints.NotBlank;
@Data
@ApiModel(description="KafkaUser和ClientId信息")
public class KafkaUserAndClientDTO {
@NotBlank(message = "kafkaUser不允许为空串")
@ApiModelProperty(value = "kafkaUser")
private String kafkaUser;
@ApiModelProperty(value = "clientId")
private String clientId;
}

View File

@@ -32,6 +32,9 @@ public class HaTopicRelationDTO {
@ApiModelProperty(value = "需要关联|解绑的topic名称列表")
private List<String> topicNames;
@ApiModelProperty(value = "解绑是否保留备集群资源topic,kafkaUser,group")
private Boolean retainStandbyResource;
@Override
public String toString() {
return "HaTopicRelationDTO{" +
@@ -39,6 +42,7 @@ public class HaTopicRelationDTO {
", standbyClusterId=" + standbyClusterId +
", all=" + all +
", topicNames=" + topicNames +
", retainStandbyResource=" + retainStandbyResource +
'}';
}

View File

@@ -21,4 +21,11 @@ public class AppRelateTopicsDTO {
@NotNull(message = "filterTopicNameList不允许为NULL")
@ApiModelProperty(value="过滤的Topic列表")
private List<String> filterTopicNameList;
@ApiModelProperty(value="使用KafkaUser+Client维度的数据默认是kafkaUser维度")
private Boolean useKafkaUserAndClientId;
@NotNull(message = "ha不允许为NULL")
@ApiModelProperty(value="查询是否高可用topic")
private Boolean ha;
}

View File

@@ -1,5 +1,7 @@
package com.xiaojukeji.kafka.manager.common.entity.pojo.gateway;
import lombok.Data;
import java.util.Date;
/**
@@ -7,6 +9,7 @@ import java.util.Date;
* @author zengqiao
* @date 20/7/6
*/
@Data
public class TopicConnectionDO {
private Long id;
@@ -22,87 +25,13 @@ public class TopicConnectionDO {
private String clientVersion;
private String clientId;
private Long realConnectTime;
private Date createTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public Long getClusterId() {
return clusterId;
}
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getClientVersion() {
return clientVersion;
}
public void setClientVersion(String clientVersion) {
this.clientVersion = clientVersion;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
@Override
public String toString() {
return "TopicConnectionDO{" +
"id=" + id +
", clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", type='" + type + '\'' +
", appId='" + appId + '\'' +
", ip='" + ip + '\'' +
", clientVersion='" + clientVersion + '\'' +
", createTime=" + createTime +
'}';
}
public String uniqueKey() {
return appId + clusterId + topicName + type + ip;
return appId + clusterId + topicName + type + ip + clientId;
}
}

View File

@@ -1,6 +1,7 @@
package com.xiaojukeji.kafka.manager.common.entity.pojo.ha;
import com.baomidou.mybatisplus.annotation.TableName;
import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BaseDO;
import lombok.AllArgsConstructor;
import lombok.Data;
@@ -37,6 +38,7 @@ public class HaASRelationDO extends BaseDO {
/**
* 资源类型
* @see HaResTypeEnum
*/
private Integer resType;

View File

@@ -1,10 +1,16 @@
package com.xiaojukeji.kafka.manager.common.entity.pojo.ha;
import com.baomidou.mybatisplus.annotation.TableName;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.BaseDO;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
* HA-主备关系切换任务表
@@ -28,15 +34,35 @@ public class HaASSwitchJobDO extends BaseDO {
*/
private Integer jobStatus;
/**
* 类型0kafkaUser 1kafkaUser+Client
*/
private Integer type;
/**
* 扩展数据
*/
private String extendData;
/**
* 操作人
*/
private String operator;
public HaASSwitchJobDO(Long activeClusterPhyId, Long standbyClusterPhyId, Integer jobStatus, String operator) {
public HaASSwitchJobDO(Long activeClusterPhyId, Long standbyClusterPhyId, Integer type, List<KafkaUserAndClientDTO> extendDataObj, Integer jobStatus, String operator) {
this.activeClusterPhyId = activeClusterPhyId;
this.standbyClusterPhyId = standbyClusterPhyId;
this.type = type;
this.extendData = ValidateUtils.isEmptyList(extendDataObj)? "": ConvertUtil.obj2Json(extendDataObj);
this.jobStatus = jobStatus;
this.operator = operator;
}
public List<KafkaUserAndClientDTO> getExtendRawData() {
if (ValidateUtils.isBlank(extendData)) {
return new ArrayList<>();
}
return ConvertUtil.str2ObjArrayByJson(extendData, KafkaUserAndClientDTO.class);
}
}

View File

@@ -2,11 +2,13 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
/**
* @author zhongyuankai,zengqiao
* @date 20/4/8
*/
@Data
@ApiModel(value = "Topic连接信息")
public class TopicConnectionVO {
@ApiModelProperty(value = "集群ID")
@@ -30,72 +32,12 @@ public class TopicConnectionVO {
@ApiModelProperty(value = "客户端版本")
private String clientVersion;
public Long getClusterId() {
return clusterId;
}
@ApiModelProperty(value = "客户端ID")
private String clientId;
public void setClusterId(Long clusterId) {
this.clusterId = clusterId;
}
@ApiModelProperty(value = "连接Broker时间")
private Long realConnectTime;
public String getTopicName() {
return topicName;
}
public void setTopicName(String topicName) {
this.topicName = topicName;
}
public String getAppId() {
return appId;
}
public void setAppId(String appId) {
this.appId = appId;
}
public String getIp() {
return ip;
}
public void setIp(String ip) {
this.ip = ip;
}
public String getHostname() {
return hostname;
}
public void setHostname(String hostname) {
this.hostname = hostname;
}
public String getClientType() {
return clientType;
}
public void setClientType(String clientType) {
this.clientType = clientType;
}
public String getClientVersion() {
return clientVersion;
}
public void setClientVersion(String clientVersion) {
this.clientVersion = clientVersion;
}
@Override
public String toString() {
return "TopicConnectionVO{" +
"clusterId=" + clusterId +
", topicName='" + topicName + '\'' +
", appId='" + appId + '\'' +
", ip='" + ip + '\'' +
", hostname='" + hostname + '\'' +
", clientType='" + clientType + '\'' +
", clientVersion='" + clientVersion + '\'' +
'}';
}
@ApiModelProperty(value = "创建时间")
private Long createTime;
}

View File

@@ -3,7 +3,9 @@ package com.xiaojukeji.kafka.manager.common.entity.vo.rd.app;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.ArrayList;
import java.util.List;
/**
@@ -11,6 +13,7 @@ import java.util.List;
* @date 20/5/4
*/
@Data
@NoArgsConstructor
@ApiModel(description="App关联Topic信息")
public class AppRelateTopicsVO {
@ApiModelProperty(value="物理集群ID")
@@ -19,6 +22,12 @@ public class AppRelateTopicsVO {
@ApiModelProperty(value="kafkaUser")
private String kafkaUser;
@ApiModelProperty(value="clientId")
private String clientId;
@ApiModelProperty(value="已建立HA的Client")
private List<String> haClientIdList;
@ApiModelProperty(value="选中的Topic列表")
private List<String> selectedTopicNameList;
@@ -27,4 +36,37 @@ public class AppRelateTopicsVO {
@ApiModelProperty(value="未建立HA的Topic列表")
private List<String> notHaTopicNameList;
public AppRelateTopicsVO(Long clusterPhyId, String kafkaUser, String clientId) {
this.clusterPhyId = clusterPhyId;
this.kafkaUser = kafkaUser;
this.clientId = clientId;
this.selectedTopicNameList = new ArrayList<>();
this.notSelectTopicNameList = new ArrayList<>();
this.notHaTopicNameList = new ArrayList<>();
}
public void addSelectedIfNotExist(String topicName) {
if (selectedTopicNameList.contains(topicName)) {
return;
}
selectedTopicNameList.add(topicName);
}
public void addNotSelectedIfNotExist(String topicName) {
if (notSelectTopicNameList.contains(topicName)) {
return;
}
notSelectTopicNameList.add(topicName);
}
public void addNotHaIfNotExist(String topicName) {
if (notHaTopicNameList.contains(topicName)) {
return;
}
notHaTopicNameList.add(topicName);
}
}

View File

@@ -0,0 +1,29 @@
package com.xiaojukeji.kafka.manager.common.utils;
public class HAUtils {
public static String mergeKafkaUserAndClient(String kafkaUser, String clientId) {
if (ValidateUtils.isBlank(clientId)) {
return kafkaUser;
}
return String.format("%s#%s", kafkaUser, clientId);
}
public static Tuple<String, String> splitKafkaUserAndClient(String kafkaUserAndClientId) {
if (ValidateUtils.isBlank(kafkaUserAndClientId)) {
return null;
}
int idx = kafkaUserAndClientId.indexOf('#');
if (idx == -1) {
return null;
} else if (idx == kafkaUserAndClientId.length() - 1) {
return new Tuple<>(kafkaUserAndClientId.substring(0, idx), "");
}
return new Tuple<>(kafkaUserAndClientId.substring(0, idx), kafkaUserAndClientId.substring(idx + 1));
}
private HAUtils() {
}
}

View File

@@ -79,10 +79,27 @@ public class JsonUtils {
TopicConnectionDO connectionDO = new TopicConnectionDO();
String[] appIdDetailArray = appIdDetail.toString().split("#");
if (appIdDetailArray.length >= 3) {
connectionDO.setAppId(appIdDetailArray[0]);
connectionDO.setIp(appIdDetailArray[1]);
connectionDO.setClientVersion(appIdDetailArray[2]);
if (appIdDetailArray == null) {
appIdDetailArray = new String[0];
}
connectionDO.setAppId(parseTopicConnections(appIdDetailArray, 0));
connectionDO.setIp(parseTopicConnections(appIdDetailArray, 1));
connectionDO.setClientVersion(parseTopicConnections(appIdDetailArray, 2));
// 解析clientId
StringBuilder sb = new StringBuilder();
for (int i = 3; i < appIdDetailArray.length - 1; ++i) {
sb.append(parseTopicConnections(appIdDetailArray, i)).append("#");
}
connectionDO.setClientId(sb.substring(0, sb.length() - 1));
// 解析时间
Long receiveTime = ConvertUtil.string2Long(parseTopicConnections(appIdDetailArray, appIdDetailArray.length - 1));
if (receiveTime == null) {
connectionDO.setRealConnectTime(-1L);
} else {
connectionDO.setRealConnectTime(receiveTime);
}
connectionDO.setClusterId(clusterId);
@@ -95,4 +112,8 @@ public class JsonUtils {
}
return connectionDOList;
}
private static String parseTopicConnections(String[] appIdDetailArray, int idx) {
return (appIdDetailArray != null && appIdDetailArray.length >= idx + 1)? appIdDetailArray[idx]: "";
}
}

View File

@@ -0,0 +1,61 @@
package com.xiaojukeji.kafka.manager.common.utils;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import lombok.Data;
/**
* @Author: D10865
* @Description:
* @Date: Create on 2018/5/29 下午4:08
* @Modified By
*/
@JsonIgnoreProperties(value = { "hibernateLazyInitializer", "handler" })
@Data
public class Tuple<T, V> {
private T v1;
private V v2;
public Tuple(){}
public Tuple(T v1, V v2) {
this.v1 = v1;
this.v2 = v2;
}
public T v1() {
return v1;
}
public Tuple<T, V> setV1(T v1) {
this.v1 = v1;
return this;
}
public V v2() {
return v2;
}
public Tuple<T, V> setV2(V v2) {
this.v2 = v2;
return this;
}
@Override
public boolean equals(Object o) {
if (this == o) {return true;}
if (o == null || getClass() != o.getClass()) {return false;}
Tuple<?, ?> tuple = (Tuple<?, ?>) o;
if (v1 != null ? !v1.equals(tuple.v1) : tuple.v1 != null) {return false;}
return v2 != null ? v2.equals(tuple.v2) : tuple.v2 == null;
}
@Override
public int hashCode() {
int result = v1 != null ? v1.hashCode() : 0;
result = 31 * result + (v2 != null ? v2.hashCode() : 0);
return result;
}
}

View File

@@ -4,10 +4,10 @@
"description": "",
"scripts": {
"prestart": "npm install --save-dev webpack-dev-server",
"start": "webpack serve",
"start": "webpack-dev-server",
"daily-build": "cross-env NODE_ENV=production webpack",
"pre-build": "cross-env NODE_ENV=production webpack",
"prod-build": "cross-env NODE_ENV=production webpack",
"prod-build": "cross-env NODE_OPTIONS=--max-old-space-size=8000 NODE_ENV=production webpack",
"fix-memory": "cross-env LIMIT=4096 increase-memory-limit"
},
"author": "",
@@ -54,10 +54,11 @@
"typescript": "^3.3.3333",
"url-loader": "^4.1.1",
"webpack": "^4.29.6",
"webpack-cli": "^4.9.1",
"webpack-cli": "^3.2.3",
"webpack-dev-server": "^3.11.3",
"xlsx": "^0.16.1"
},
"dependencies": {
"format-to-json": "^1.0.4"
}
}
}

View File

@@ -2,9 +2,11 @@ import * as React from 'react';
import { admin } from 'store/admin';
import { Modal, Form, Radio } from 'antd';
import { IBrokersMetadata, IBrokersRegions, IMetaData } from 'types/base-type';
import { Alert, message, notification, Table, Tooltip, Transfer } from 'component/antd';
import { getClusterHaTopicsStatus, setHaTopics, unbindHaTopics } from 'lib/api';
import { Alert, message, notification, Table, Tooltip, Spin } from 'component/antd';
import { getClusterHaTopicsStatus, getAppRelatedTopics, setHaTopics, unbindHaTopics } from 'lib/api';
import { cellStyle } from 'constants/table';
import { renderAttributes, TransferTable, IKafkaUser } from './TopicHaSwitch'
import './index.less'
const layout = {
labelCol: { span: 3 },
@@ -24,6 +26,7 @@ interface IHaTopic {
clusterId: number;
clusterName: string;
haRelation: number;
isHaRelation: boolean;
topicName: string;
key: string;
disabled?: boolean;
@@ -68,27 +71,104 @@ const resColumns = [
},
},
];
const columns = [
{
dataIndex: 'topicName',
title: '名称',
width: 260,
ellipsis: true,
},
];
const kafkaUserColumn = [
{
dataIndex: 'kafkaUser',
title: 'kafkaUser',
width: 100,
ellipsis: true,
},
{
dataIndex: 'manualSelectedTopics',
title: '已选中Topic',
width: 120,
render: (text: string[]) => {
return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
},
},
{
dataIndex: 'autoSelectedTopics',
title: '选中关联Topic',
width: 120,
render: (text: string[]) => {
return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
},
},
];
class TopicHaRelation extends React.Component<IXFormProps> {
public state = {
radioCheck: 'spec',
haTopics: [] as IHaTopic[],
topics: [] as IHaTopic[],
kafkaUsers: [] as IKafkaUser[],
targetKeys: [] as string[],
selectedKeys: [] as string[],
confirmLoading: false,
firstMove: true,
primaryActiveKeys: [] as string[],
primaryStandbyKeys: [] as string[],
manualSelectedKeys: [] as string[],
spinLoading: false,
};
public selectSingle = null as boolean;
public manualSelectedNames = [] as string[];
public setSelectSingle = (val: boolean) => {
this.selectSingle = val;
}
public setManualSelectedNames = (keys: string[]) => {
// this.manualSelectedNames = this.getTopicsByKeys(keys);
this.manualSelectedNames = keys;
}
public filterManualSelectedKeys = (key: string, selected: boolean) => {
const newManualSelectedKeys = [...this.state.manualSelectedKeys];
const index = newManualSelectedKeys.findIndex(item => item === key);
if (selected) {
if (index === -1) newManualSelectedKeys.push(key);
} else {
if (index !== -1) newManualSelectedKeys.splice(index, 1);
}
this.setManualSelectedNames(newManualSelectedKeys);
this.setState({
manualSelectedKeys: newManualSelectedKeys,
});
}
public getManualSelected = (single: boolean, key?: any, selected?: boolean) => {
this.setSelectSingle(single);
if (single) {
this.filterManualSelectedKeys(key, selected);
} else {
this.setManualSelectedNames(key);
this.setState({
manualSelectedKeys: key,
});
}
}
public handleOk = () => {
this.props.form.validateFields((err: any, values: any) => {
const unbindTopics = [];
const bindTopics = [];
const { primaryStandbyKeys, targetKeys} = this.state;
const unbindKeys = [];
const bindKeys = [];
if (values.rule === 'all') {
setHaTopics({
all: true,
activeClusterId: this.props.currentCluster.clusterId,
standbyClusterId: this.props.currentCluster.haClusterVO.clusterId,
standbyClusterId: this.props.currentCluster.haClusterVO?.clusterId,
topicNames: [],
}).then(res => {
handleMsg(res, '关联成功');
@@ -100,18 +180,18 @@ class TopicHaRelation extends React.Component<IXFormProps> {
return;
}
for (const item of this.state.primaryStandbyKeys) {
if (!this.state.targetKeys.includes(item)) {
unbindTopics.push(item);
for (const item of primaryStandbyKeys) {
if (!targetKeys.includes(item)) {
unbindKeys.push(item);
}
}
for (const item of this.state.targetKeys) {
if (!this.state.primaryStandbyKeys.includes(item)) {
bindTopics.push(item);
for (const item of targetKeys) {
if (!primaryStandbyKeys.includes(item)) {
bindKeys.push(item);
}
}
if (!unbindTopics.length && !bindTopics.length) {
if (!unbindKeys.length && !bindKeys.length) {
return message.info('请选择您要操作的Topic');
}
@@ -140,15 +220,16 @@ class TopicHaRelation extends React.Component<IXFormProps> {
this.props.reload();
};
if (bindTopics.length) {
if (bindKeys.length) {
this.setState({
confirmLoading: true,
});
setHaTopics({
all: false,
activeClusterId: this.props.currentCluster.clusterId,
standbyClusterId: this.props.currentCluster.haClusterVO.clusterId,
topicNames: bindTopics,
standbyClusterId: this.props.currentCluster.haClusterVO?.clusterId,
// topicNames: this.getTopicsByKeys(bindKeys),
topicNames: bindKeys,
}).then(res => {
this.setState({
confirmLoading: false,
@@ -158,15 +239,17 @@ class TopicHaRelation extends React.Component<IXFormProps> {
});
}
if (unbindTopics.length) {
if (unbindKeys.length) {
this.setState({
confirmLoading: true,
});
unbindHaTopics({
all: false,
activeClusterId: this.props.currentCluster.clusterId,
standbyClusterId: this.props.currentCluster.haClusterVO.clusterId,
topicNames: unbindTopics,
standbyClusterId: this.props.currentCluster.haClusterVO?.clusterId,
// topicNames: this.getTopicsByKeys(unbindKeys),
topicNames: unbindKeys,
retainStandbyResource: values.retainStandbyResource,
}).then(res => {
this.setState({
confirmLoading: false,
@@ -194,43 +277,253 @@ class TopicHaRelation extends React.Component<IXFormProps> {
let isReset = false;
// 判断当前移动是否还原为最初的状态
if (primaryStandbyKeys.length === targetKeys.length) {
targetKeys.sort((a, b) => +a - (+b));
primaryStandbyKeys.sort((a, b) => +a - (+b));
let i = 0;
while (i < targetKeys.length) {
if (targetKeys[i] === primaryStandbyKeys[i]) {
i++;
} else {
break;
}
}
isReset = i === targetKeys.length;
const diff = targetKeys.find(item => primaryStandbyKeys.indexOf(item) < 0);
isReset = diff ? false : true;
}
return isReset;
}
public getNewSelectKeys = (removeKeys: string[], selectedKeys: string[]) => {
const { topics, kafkaUsers } = this.state;
// 根据移除的key找与该key关联的其他key一起移除
let relatedTopics: string[] = [];
const relatedKeys: string[] = [];
const newSelectKeys = [];
for (const key of removeKeys) {
const topicName = topics.find(row => row.key === key)?.topicName;
for (const item of kafkaUsers) {
if (item.selectedTopicNameList.includes(topicName)) {
relatedTopics = relatedTopics.concat(item.selectedTopicNameList, item.notSelectTopicNameList);
}
}
for (const item of relatedTopics) {
const key = topics.find(row => row.topicName === item)?.key;
if (key) {
relatedKeys.push(key);
}
}
for (const key of selectedKeys) {
if (!relatedKeys.includes(key)) {
newSelectKeys.push(key);
}
}
}
return newSelectKeys;
}
public setTopicsStatus = (targetKeys: string[], disabled: boolean, isAll = false) => {
const { haTopics } = this.state;
const newTopics = Array.from(haTopics);
const { topics } = this.state;
const newTopics = Array.from(topics);
if (isAll) {
for (let i = 0; i < haTopics.length; i++) {
for (let i = 0; i < topics.length; i++) {
newTopics[i].disabled = disabled;
}
} else {
for (const key of targetKeys) {
const index = haTopics.findIndex(item => item.key === key);
const index = topics.findIndex(item => item.key === key);
if (index > -1) {
newTopics[index].disabled = disabled;
}
}
}
this.setState(({
haTopics: newTopics,
topics: newTopics,
}));
}
public onTransferChange = (targetKeys: string[], direction: string, moveKeys: string[]) => {
const { primaryStandbyKeys, firstMove, primaryActiveKeys } = this.state;
public getTopicsByKeys = (keys: string[]) => {
// 依据key值找topicName
const topicNames: string[] = [];
for (const key of keys) {
const topicName = this.state.topics.find(item => item.key === key)?.topicName;
if (topicName) {
topicNames.push(topicName);
}
}
return topicNames;
}
public getNewKafkaUser = (targetKeys: string[]) => {
const { primaryStandbyKeys, kafkaUsers, topics } = this.state;
const removeKeys = [];
const addKeys = [];
for (const key of primaryStandbyKeys) {
if (targetKeys.indexOf(key) < 0) {
// 移除的
removeKeys.push(key);
}
}
for (const key of targetKeys) {
if (primaryStandbyKeys.indexOf(key) < 0) {
// 新增的
addKeys.push(key);
}
}
const keepKeys = [...removeKeys, ...addKeys];
const newKafkaUsers = kafkaUsers;
const moveTopics = this.getTopicsByKeys(keepKeys);
for (const topic of moveTopics) {
for (const item of newKafkaUsers) {
if (item.selectedTopicNameList.includes(topic)) {
item.show = true;
}
}
}
const showKafaUsers = newKafkaUsers.filter(item => item.show === true);
for (const item of showKafaUsers) {
let i = 0;
while (i < moveTopics.length) {
if (!item.selectedTopicNameList.includes(moveTopics[i])) {
i++;
} else {
break;
}
}
// 表示该kafkaUser不该展示
if (i === moveTopics.length) {
item.show = false;
}
}
return showKafaUsers;
}
public getAppRelatedTopicList = (selectedKeys: string[]) => {
const { topics, targetKeys, primaryStandbyKeys, kafkaUsers } = this.state;
const filterTopicNameList = this.getTopicsByKeys(selectedKeys);
const isReset = this.isPrimaryStatus(targetKeys);
if (!filterTopicNameList.length && isReset) {
// targetKeys
this.setState({
kafkaUsers: kafkaUsers.map(item => ({
...item,
show: false,
})),
});
return;
} else {
// 保留选中项与移动的的项
this.setState({
kafkaUsers: this.getNewKafkaUser(targetKeys),
});
}
// 单向选择所以取当前值的clusterId
const clusterInfo = topics.find(item => item.topicName === filterTopicNameList[0]);
const clusterPhyId = clusterInfo?.clusterId;
if (!clusterPhyId) return;
this.setState({spinLoading: true});
getAppRelatedTopics({
clusterPhyId,
filterTopicNameList,
ha: clusterInfo.isHaRelation,
useKafkaUserAndClientId: false,
}).then((res: IKafkaUser[]) => {
let notSelectTopicNames: string[] = [];
const notSelectTopicKeys: string[] = [];
for (const item of (res || [])) {
notSelectTopicNames = notSelectTopicNames.concat(item.notSelectTopicNameList || []);
}
for (const item of notSelectTopicNames) {
const key = topics.find(row => row.topicName === item)?.key;
if (key && notSelectTopicKeys.indexOf(key) < 0) {
notSelectTopicKeys.push(key);
}
}
const newSelectedKeys = selectedKeys.concat(notSelectTopicKeys);
const newKafkaUsers = (res || []).map(item => ({
...item,
show: true,
manualSelectedTopics: item.selectedTopicNameList.filter(topic => this.manualSelectedNames.indexOf(topic) > -1),
autoSelectedTopics: [...item.selectedTopicNameList, ...item.notSelectTopicNameList].filter(topic => this.manualSelectedNames.indexOf(topic) === -1),
}));
const { kafkaUsers } = this.state;
for (const item of kafkaUsers) {
const resItem = res.find(row => row.kafkaUser === item.kafkaUser);
if (!resItem) {
newKafkaUsers.push(item);
}
}
this.setState({
kafkaUsers: newKafkaUsers,
selectedKeys: newSelectedKeys,
});
if (notSelectTopicKeys.length) {
this.getAppRelatedTopicList(newSelectedKeys);
}
}).finally(() => {
this.setState({spinLoading: false});
});
}
public getRelatedKeys = (currentKeys: string[]) => {
// 未被选中的项
const removeKeys = [];
// 对比上一次记录的选中的值找出本次取消的项
const { selectedKeys } = this.state;
for (const preKey of selectedKeys) {
if (!currentKeys.includes(preKey)) {
removeKeys.push(preKey);
}
}
return removeKeys?.length ? this.getNewSelectKeys(removeKeys, currentKeys) : currentKeys;
}
public handleTopicChange = (sourceSelectedKeys: string[], targetSelectedKeys: string[]) => {
if (this.selectSingle) {
this.setSelectSingle(false);
} else {
this.getManualSelected(false, [...sourceSelectedKeys, ...targetSelectedKeys])
}
const { topics, targetKeys } = this.state;
// 条件限制只允许选中一边,单向操作
const keys = [...sourceSelectedKeys, ...targetSelectedKeys];
// 判断当前选中项属于哪一类
if (keys.length) {
const isHaRelation = topics.find(item => item.key === keys[0])?.isHaRelation;
const needDisabledKeys = topics.filter(item => item.isHaRelation !== isHaRelation).map(row => row.key);
this.setTopicsStatus(needDisabledKeys, true);
}
const selectedKeys = this.state.selectedKeys.length ? this.getRelatedKeys(keys) : keys;
const isReset = this.isPrimaryStatus(targetKeys);
if (!selectedKeys.length && isReset) {
this.setTopicsStatus([], false, true);
}
this.setState({
selectedKeys,
});
this.getAppRelatedTopicList(selectedKeys);
}
public onDirectChange = (targetKeys: string[], direction: string, moveKeys: string[]) => {
const { primaryStandbyKeys, firstMove, primaryActiveKeys, kafkaUsers } = this.state;
const getKafkaUser = () => {
const newKafkaUsers = kafkaUsers;
const moveTopics = this.getTopicsByKeys(moveKeys);
for (const topic of moveTopics) {
for (const item of newKafkaUsers) {
if (item.selectedTopicNameList.includes(topic)) {
item.show = true;
}
}
}
return newKafkaUsers;
};
// 判断当前移动是否还原为最初的状态
const isReset = this.isPrimaryStatus(targetKeys);
if (firstMove) {
@@ -238,24 +531,26 @@ class TopicHaRelation extends React.Component<IXFormProps> {
this.setTopicsStatus(primaryKeys, true, false);
this.setState(({
firstMove: false,
kafkaUsers: getKafkaUser(),
targetKeys,
}));
return;
}
// 如果是还原为初始状态则还原禁用状态
if (isReset) {
this.setTopicsStatus([], false, true);
this.setState(({
firstMove: true,
targetKeys,
kafkaUsers: [],
}));
return;
}
this.setState({
this.setState(({
targetKeys,
});
kafkaUsers: this.getNewKafkaUser(targetKeys),
}));
}
public componentDidMount() {
@@ -265,17 +560,19 @@ class TopicHaRelation extends React.Component<IXFormProps> {
]).then(([activeRes, standbyRes]: IHaTopic[][]) => {
activeRes = (activeRes || []).map(row => ({
...row,
isHaRelation: row.haRelation === 1 || row.haRelation === 0,
key: row.topicName,
})).filter(item => item.haRelation === null);
})).filter(item => !item.isHaRelation);
standbyRes = (standbyRes || []).map(row => ({
...row,
isHaRelation: row.haRelation === 1 || row.haRelation === 0,
key: row.topicName,
})).filter(item => item.haRelation === 1 || item.haRelation === 0);
})).filter(item => item.isHaRelation);
this.setState({
haTopics: [].concat([...activeRes, ...standbyRes]).sort((a, b) => a.topicName.localeCompare(b.topicName)),
primaryActiveKeys: activeRes.map(row => row.topicName),
primaryStandbyKeys: standbyRes.map(row => row.topicName),
targetKeys: standbyRes.map(row => row.topicName),
topics: [].concat([...activeRes, ...standbyRes]).sort((a, b) => a.topicName.localeCompare(b.topicName)),
primaryActiveKeys: activeRes.map(row => row.key),
primaryStandbyKeys: standbyRes.map(row => row.key),
targetKeys: standbyRes.map(row => row.key),
});
});
}
@@ -287,6 +584,9 @@ class TopicHaRelation extends React.Component<IXFormProps> {
metadata = admin.brokersMetadata ? admin.brokersMetadata : metadata;
let regions = [] as IBrokersRegions[];
regions = admin.brokersRegions ? admin.brokersRegions : regions;
const { kafkaUsers, confirmLoading, radioCheck, targetKeys, selectedKeys, topics, primaryStandbyKeys, spinLoading} = this.state;
const tableData = kafkaUsers.filter(row => row.show);
return (
<>
<Modal
@@ -296,53 +596,78 @@ class TopicHaRelation extends React.Component<IXFormProps> {
onOk={this.handleOk}
onCancel={this.handleCancel}
maskClosable={false}
confirmLoading={this.state.confirmLoading}
width={590}
confirmLoading={confirmLoading}
width={800}
okText="确认"
cancelText="取消"
>
<Alert
message={`将【集群${currentCluster.clusterName}】和【集群${currentCluster.haClusterVO?.clusterName}】的Topic关联高可用关系`}
message={`将【集群${currentCluster.clusterName}】和【集群${currentCluster.haClusterVO?.clusterName}】的Topic关联高可用关系此操作会把同一个kafkaUser下的所有Topic都关联高可用关系`}
type="info"
showIcon={true}
/>
<Form {...layout} name="basic" className="x-form">
{/* <Form.Item label="规则">
{getFieldDecorator('rule', {
initialValue: 'spec',
rules: [{
required: true,
message: '请选择规则',
}],
})(<Radio.Group onChange={this.handleRadioChange} >
<Radio value="all">应用于所有Topic</Radio>
<Radio value="spec">应用于特定Topic</Radio>
</Radio.Group>)}
</Form.Item> */}
{this.state.radioCheck === 'spec' ? <Form.Item className="no-label" label="" >
{getFieldDecorator('topicNames', {
initialValue: this.state.targetKeys,
rules: [{
required: false,
message: '请选择Topic',
}],
})(
<Transfer
className="transfe-list"
dataSource={this.state.haTopics}
targetKeys={this.state.targetKeys}
showSearch={true}
onChange={this.onTransferChange}
render={item => item.topicName}
titles={['未关联', '已关联']}
locale={{
itemUnit: '',
itemsUnit: '',
}}
/>,
)}
</Form.Item> : ''}
</Form>
<Spin spinning={spinLoading}>
<Form {...layout} name="basic" className="x-form">
{/* <Form.Item label="规则">
{getFieldDecorator('rule', {
initialValue: 'spec',
rules: [{
required: true,
message: '请选择规则',
}],
})(<Radio.Group onChange={this.handleRadioChange} >
<Radio value="all">应用于所有Topic</Radio>
<Radio value="spec">应用于特定Topic</Radio>
</Radio.Group>)}
</Form.Item> */}
{radioCheck === 'spec' ? <Form.Item className="no-label" label="" >
{getFieldDecorator('topicNames', {
initialValue: targetKeys,
rules: [{
required: false,
message: '请选择Topic',
}],
})(
<TransferTable
selectedKeys={selectedKeys}
topicChange={this.handleTopicChange}
onDirectChange={this.onDirectChange}
columns={columns}
dataSource={topics}
currentCluster={currentCluster}
getManualSelected={this.getManualSelected}
transferAttrs={{
titles: ['未关联', '已关联'],
}}
tableAttrs={{
className: 'no-table-header',
}}
/>,
)}
</Form.Item> : null}
{radioCheck === 'spec' ? <Table
className="modal-table-content no-lr-padding"
columns={kafkaUserColumn}
dataSource={tableData}
size="small"
rowKey="kafkaUser"
pagination={false}
scroll={{ y: 300 }}
/> : null}
{targetKeys.length < primaryStandbyKeys.length ? <Form.Item label="数据清理策略" labelCol={{span: 4}} wrapperCol={{span: 20}}>
{getFieldDecorator('retainStandbyResource', {
initialValue: false,
rules: [{
required: true,
message: '请选择数据清理策略',
}],
})(<Radio.Group>
<Radio value={false}></Radio>
<Radio value={true}></Radio>
</Radio.Group>)}
</Form.Item> : null}
</Form>
</Spin>
</Modal>
</>
);

View File

@@ -1,12 +1,13 @@
import * as React from 'react';
import { admin } from 'store/admin';
import { Modal, Form, Radio, Tag, Popover, Button } from 'antd';
import { Modal, Form, Radio, Tag, Popover, Button, Tooltip, Spin } from 'antd';
import { IBrokersMetadata, IBrokersRegions, IMetaData } from 'types/base-type';
import { Alert, Icon, message, Table, Transfer } from 'component/antd';
import { getClusterHaTopics, getAppRelatedTopics, createSwitchTask } from 'lib/api';
import { TooltipPlacement } from 'antd/es/tooltip';
import * as XLSX from 'xlsx';
import moment from 'moment';
import { cloneDeep } from "lodash";
import { timeMinute } from 'constants/strategy';
const layout = {
@@ -35,13 +36,17 @@ interface IHaTopic {
disabled?: boolean;
}
interface IKafkaUser {
export interface IKafkaUser {
clusterPhyId: number;
kafkaUser: string;
notHaTopicNameList: string[];
notSelectTopicNameList: string[];
selectedTopicNameList: string[];
show: boolean;
manualSelectedTopics: string[];
autoSelectedTopics: string[];
clientId?: string;
haClientIdList?: string[]
}
const columns = [
@@ -71,17 +76,23 @@ const kafkaUserColumn = [
ellipsis: true,
},
{
dataIndex: 'selectedTopicNameList',
dataIndex: 'clientId',
title: 'clientID',
width: 100,
ellipsis: true,
},
{
dataIndex: 'manualSelectedTopics',
title: '已选中Topic',
width: 120,
// width: 120,
render: (text: string[]) => {
return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
},
},
{
dataIndex: 'notSelectTopicNameList',
dataIndex: 'autoSelectedTopics',
title: '选中关联Topic',
width: 120,
// width: 120,
render: (text: string[]) => {
return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
},
@@ -89,7 +100,7 @@ const kafkaUserColumn = [
{
dataIndex: 'notHaTopicNameList',
title: '未建立HA Topic',
width: 120,
// width: 120,
render: (text: string[]) => {
return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
},
@@ -135,31 +146,64 @@ export const renderAttributes = (params: {
class TopicHaSwitch extends React.Component<IXFormProps> {
public state = {
radioCheck: 'spec',
switchMode: 'kafkaUser',
targetKeys: [] as string[],
selectedKeys: [] as string[],
topics: [] as IHaTopic[],
kafkaUsers: [] as IKafkaUser[],
primaryTopics: [] as string[],
primaryActiveKeys: [] as string[],
primaryStandbyKeys: [] as string[],
firstMove: true,
manualSelectedKeys: [] as string[],
selectTableColumn: kafkaUserColumn.filter(item => item.title !== 'clientID') as [],
spinLoading: false,
};
public selectSingle = null as boolean;
public manualSelectedNames = [] as string[];
public setSelectSingle = (val: boolean) => {
this.selectSingle = val;
}
public setManualSelectedNames = (keys: string[]) => {
// this.manualSelectedNames = this.getTopicsByKeys(keys);
this.manualSelectedNames = keys;
}
public filterManualSelectedKeys = (key: string, selected: boolean) => {
const newManualSelectedKeys = [...this.state.manualSelectedKeys];
const index = newManualSelectedKeys.findIndex(item => item === key);
if (selected) {
if (index === -1) newManualSelectedKeys.push(key);
} else {
if (index !== -1) newManualSelectedKeys.splice(index, 1);
}
this.setManualSelectedNames(newManualSelectedKeys);
this.setState({
manualSelectedKeys: newManualSelectedKeys,
});
}
public getManualSelected = (single: boolean, key?: any, selected?: boolean) => {
this.setSelectSingle(single);
if (single) {
this.filterManualSelectedKeys(key, selected);
} else {
this.setManualSelectedNames(key);
this.setState({
manualSelectedKeys: key,
});
}
}
public isPrimaryStatus = (targetKeys: string[]) => {
const { primaryStandbyKeys } = this.state;
let isReset = false;
// 判断当前移动是否还原为最初的状态
if (primaryStandbyKeys.length === targetKeys.length) {
targetKeys.sort((a, b) => +a - (+b));
primaryStandbyKeys.sort((a, b) => +a - (+b));
let i = 0;
while (i < targetKeys.length) {
if (targetKeys[i] === primaryStandbyKeys[i]) {
i++;
} else {
break;
}
}
isReset = i === targetKeys.length;
const diff = targetKeys.find(item => primaryStandbyKeys.indexOf(item) < 0);
isReset = diff ? false : true;
}
return isReset;
}
@@ -168,16 +212,17 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
const targetTopics = [];
for (const key of currentKeys) {
if (!primaryKeys.includes(key)) {
const topic = this.state.topics.find(item => item.key === key)?.topicName;
targetTopics.push(topic);
// const topic = this.state.topics.find(item => item.key === key)?.topicName;
// targetTopics.push(topic);
targetTopics.push(key);
}
}
return targetTopics;
}
public handleOk = () => {
const { primaryStandbyKeys, primaryActiveKeys, topics } = this.state;
const standbyClusterId = this.props.currentCluster.haClusterVO.clusterId;
const { primaryStandbyKeys, primaryActiveKeys, topics, kafkaUsers, switchMode } = this.state;
const standbyClusterId = this.props.currentCluster.haClusterVO?.clusterId;
const activeClusterId = this.props.currentCluster.clusterId;
this.props.form.validateFields((err: any, values: any) => {
@@ -188,6 +233,7 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
all: true,
mustContainAllKafkaUserTopics: true,
standbyClusterPhyId: standbyClusterId,
kafkaUserAndClientIdList: [],
topicNameList: [],
}).then(res => {
message.success('任务创建成功');
@@ -217,11 +263,28 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
const activeClusterPhyId = currentStandbyKeys.length > primaryStandbyKeys.length ? standbyClusterId : activeClusterId;
const standbyClusterPhyId = currentStandbyKeys.length > primaryStandbyKeys.length ? activeClusterId : standbyClusterId;
const targetTopics = this.getTargetTopics(currentKeys, primaryKeys);
const clientIdParams = kafkaUsers.map(item => ({ clientId: item.clientId, kafkaUser: item.kafkaUser }));
const kafkaUserParams = [] as any;
kafkaUsers.forEach(item => {
kafkaUserParams.push({
clientId: null,
kafkaUser: item.kafkaUser,
});
if (item.haClientIdList?.length) {
item.haClientIdList.forEach(clientId => {
kafkaUserParams.push({
clientId,
kafkaUser: item.kafkaUser,
});
});
}
});
createSwitchTask({
activeClusterPhyId,
all: false,
mustContainAllKafkaUserTopics: true,
mustContainAllKafkaUserTopics: switchMode === 'kafkaUser',
standbyClusterPhyId,
kafkaUserAndClientIdList: switchMode === 'clientID' ? clientIdParams : kafkaUserParams,
topicNameList: targetTopics,
}).then(res => {
message.success('任务创建成功');
@@ -252,8 +315,7 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
const topicName = topics.find(row => row.key === key)?.topicName;
for (const item of kafkaUsers) {
if (item.selectedTopicNameList.includes(topicName)) {
relatedTopics = relatedTopics.concat(item.selectedTopicNameList);
relatedTopics = relatedTopics.concat(item.notSelectTopicNameList);
relatedTopics = relatedTopics.concat(item.selectedTopicNameList, item.notSelectTopicNameList);
}
}
for (const item of relatedTopics) {
@@ -291,21 +353,20 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}));
}
public getFilterTopics = (selectKeys: string[]) => {
public getTopicsByKeys = (keys: string[]) => {
// 依据key值找topicName
const filterTopics: string[] = [];
const targetKeys = selectKeys;
for (const key of targetKeys) {
const topicNames: string[] = [];
for (const key of keys) {
const topicName = this.state.topics.find(item => item.key === key)?.topicName;
if (topicName) {
filterTopics.push(topicName);
topicNames.push(topicName);
}
}
return filterTopics;
return topicNames;
}
public getNewKafkaUser = (targetKeys: string[]) => {
const { primaryStandbyKeys, topics } = this.state;
const { primaryStandbyKeys, kafkaUsers, topics } = this.state;
const removeKeys = [];
const addKeys = [];
for (const key of primaryStandbyKeys) {
@@ -321,9 +382,9 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}
}
const keepKeys = [...removeKeys, ...addKeys];
const newKafkaUsers = this.state.kafkaUsers;
const newKafkaUsers = kafkaUsers;
const moveTopics = this.getFilterTopics(keepKeys);
const moveTopics = this.getTopicsByKeys(keepKeys);
for (const topic of moveTopics) {
for (const item of newKafkaUsers) {
@@ -355,8 +416,8 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}
public getAppRelatedTopicList = (selectedKeys: string[]) => {
const { topics, targetKeys, primaryStandbyKeys, kafkaUsers } = this.state;
const filterTopicNameList = this.getFilterTopics(selectedKeys);
const { topics, targetKeys, primaryStandbyKeys, kafkaUsers, switchMode } = this.state;
const filterTopicNameList = this.getTopicsByKeys(selectedKeys);
const isReset = this.isPrimaryStatus(targetKeys);
if (!filterTopicNameList.length && isReset) {
@@ -376,10 +437,14 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}
// 单向选择所以取当前值的aactiveClusterId
const clusterPhyId = topics.find(item => item.topicName === filterTopicNameList[0]).activeClusterId;
const clusterPhyId = topics.find(item => item.topicName === filterTopicNameList[0])?.activeClusterId;
if (!clusterPhyId) return;
this.setState({spinLoading: true});
getAppRelatedTopics({
clusterPhyId,
filterTopicNameList,
ha: true,
useKafkaUserAndClientId: switchMode === 'clientID',
}).then((res: IKafkaUser[]) => {
let notSelectTopicNames: string[] = [];
const notSelectTopicKeys: string[] = [];
@@ -390,7 +455,7 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
for (const item of notSelectTopicNames) {
const key = topics.find(row => row.topicName === item)?.key;
if (key) {
if (key && notSelectTopicKeys.indexOf(key) < 0) {
notSelectTopicKeys.push(key);
}
}
@@ -399,11 +464,13 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
const newKafkaUsers = (res || []).map(item => ({
...item,
show: true,
manualSelectedTopics: item.selectedTopicNameList.filter(topic => this.manualSelectedNames.indexOf(topic) > -1),
autoSelectedTopics: [...item.selectedTopicNameList, ...item.notSelectTopicNameList].filter(topic => this.manualSelectedNames.indexOf(topic) === -1),
}));
const { kafkaUsers } = this.state;
for (const item of kafkaUsers) {
const resItem = res.find(row => row.kafkaUser === item.kafkaUser);
const resItem = res.find(row => switchMode === 'clientID' ? row.kafkaUser === item.kafkaUser && row.clientId === item.clientId : row.kafkaUser === item.kafkaUser);
if (!resItem) {
newKafkaUsers.push(item);
}
@@ -416,6 +483,8 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
if (notSelectTopicKeys.length) {
this.getAppRelatedTopicList(newSelectedKeys);
}
}).finally(() => {
this.setState({spinLoading: false});
});
}
@@ -440,7 +509,7 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
// 判断当前选中项属于哪一类
if (keys.length) {
const activeClusterId = topics.find(item => item.key === keys[0]).activeClusterId;
const activeClusterId = topics.find(item => item.key === keys[0])?.activeClusterId;
const needDisabledKeys = topics.filter(item => item.activeClusterId !== activeClusterId).map(row => row.key);
this.setTopicsStatus(needDisabledKeys, true);
}
@@ -457,11 +526,11 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}
public onDirectChange = (targetKeys: string[], direction: string, moveKeys: string[]) => {
const { primaryStandbyKeys, firstMove, primaryActiveKeys, topics } = this.state;
const { primaryStandbyKeys, firstMove, primaryActiveKeys, kafkaUsers, topics } = this.state;
const getKafkaUser = () => {
const newKafkaUsers = this.state.kafkaUsers;
const moveTopics = this.getFilterTopics(moveKeys);
const newKafkaUsers = kafkaUsers;
const moveTopics = this.getTopicsByKeys(moveKeys);
for (const topic of moveTopics) {
for (const item of newKafkaUsers) {
if (item.selectedTopicNameList.includes(topic)) {
@@ -503,21 +572,27 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}
public downloadData = () => {
const { kafkaUsers } = this.state;
const { kafkaUsers, switchMode } = this.state;
const tableData = kafkaUsers.map(item => {
return {
const column = {
// tslint:disable
'kafkaUser': item.kafkaUser,
'已选中Topic': item.selectedTopicNameList?.join('、'),
'选中关联Topic': item.notSelectTopicNameList?.join('、'),
'clientID': item.clientId,
'选中Topic': item.manualSelectedTopics?.join('、'),
'选中关联Topic': item.autoSelectedTopics?.join('、'),
'未建立HA Topic': item.notHaTopicNameList?.join(``),
};
if (switchMode === 'kafkaUser') {
delete column.clientID
}
return column;
});
const data = [].concat(tableData);
const wb = XLSX.utils.book_new();
// json转sheet
const header = ['kafkaUser', 'clientID', '已选中Topic', '选中关联Topic', '未建立HA Topic'];
const ws = XLSX.utils.json_to_sheet(data, {
header: ['kafkaUser', '已选中Topic', '选中关联Topic', '未建立HA Topic'],
header: switchMode === 'kafkaUser' ? header.filter(item => item !== 'clientID') : header,
});
// XLSX.utils.
XLSX.utils.book_append_sheet(wb, ws, 'kafkaUser');
@@ -537,18 +612,38 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
return false;
}
public onModeChange = (e: any) => {
const mode = e.target.value;
// 切换方式变更时,初始化数据
const { primaryTopics, primaryStandbyKeys } = this.state;
this.setState({
switchMode: mode,
topics: cloneDeep(primaryTopics),
targetKeys: primaryStandbyKeys,
selectedKeys: [],
kafkaUsers: [],
firstMove: true,
manualSelectedKeys: [],
selectTableColumn: mode === 'kafkaUser' ? kafkaUserColumn.filter(item => item.title !== 'clientID') : kafkaUserColumn,
});
this.props.form.setFieldsValue({targetKeys: primaryStandbyKeys});
this.setSelectSingle(null);
this.setManualSelectedNames([]);
}
public componentDidMount() {
const standbyClusterId = this.props.currentCluster.haClusterVO.clusterId;
const standbyClusterId = this.props.currentCluster.haClusterVO?.clusterId;
const activeClusterId = this.props.currentCluster.clusterId;
getClusterHaTopics(this.props.currentCluster.clusterId, standbyClusterId).then((res: IHaTopic[]) => {
res = res.map((item, index) => ({
key: index.toString(),
getClusterHaTopics(activeClusterId, standbyClusterId).then((res: IHaTopic[]) => {
res = res.map((item) => ({
key: item.topicName,
...item,
}));
const targetKeys = (res || []).filter((item) => item.activeClusterId === standbyClusterId).map(row => row.key);
const primaryActiveKeys = (res || []).filter((item) => item.activeClusterId === activeClusterId).map(row => row.key);
this.setState({
topics: res || [],
primaryTopics: cloneDeep(res) || [],
primaryStandbyKeys: targetKeys,
primaryActiveKeys,
targetKeys,
@@ -563,7 +658,15 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
metadata = admin.brokersMetadata ? admin.brokersMetadata : metadata;
let regions = [] as IBrokersRegions[];
regions = admin.brokersRegions ? admin.brokersRegions : regions;
const tableData = this.state.kafkaUsers.filter(row => row.show);
const { switchMode, kafkaUsers, radioCheck, targetKeys, selectedKeys, topics, selectTableColumn, spinLoading } = this.state;
const tableData = kafkaUsers.filter(row => row.show);
const rulesNode = (
<div>
1ClientID格式为P#C#
<br />
2ClientID格式的高可用TopicTopicClientID的格式是否正确
</div>
);
return (
<Modal
@@ -580,62 +683,83 @@ class TopicHaSwitch extends React.Component<IXFormProps> {
}
>
<Alert
message={`注意:必须把同一个kafkauser关联的所有Topic都建立高可用关系并且都选中才能执行任务`}
message={`注意:必须把同一个${switchMode}关联的所有Topic都建立高可用关系并且都选中才能执行任务`}
type="info"
showIcon={true}
/>
<Form {...layout} name="basic" className="x-form">
{/* <Form.Item label="规则" >
{getFieldDecorator('rule', {
initialValue: 'spec',
rules: [{
required: true,
message: '请选择规则',
}],
})(<Radio.Group onChange={this.handleRadioChange} >
<Radio value="all">应用于所有Topic</Radio>
<Radio value="spec">应用于特定Topic</Radio>
</Radio.Group>)}
</Form.Item> */}
{this.state.radioCheck === 'spec' ? <Form.Item className="no-label" label="" >
{getFieldDecorator('targetKeys', {
initialValue: this.state.targetKeys,
rules: [{
required: false,
message: '请选择Topic',
}],
})(
<TransferTable
selectedKeys={this.state.selectedKeys}
topicChange={this.handleTopicChange}
onDirectChange={this.onDirectChange}
dataSource={this.state.topics}
currentCluster={currentCluster}
/>,
)}
</Form.Item> : ''}
</Form>
{this.state.radioCheck === 'spec' ?
<>
<Table
className="modal-table-content"
columns={kafkaUserColumn}
dataSource={tableData}
size="small"
rowKey="kafkaUser"
pagination={false}
scroll={{ y: 300 }}
/>
{this.state.kafkaUsers.length ? <div onClick={this.downloadData} className="modal-table-download"><a></a></div> : null}
</>
: null}
<Spin spinning={spinLoading}>
<Form {...layout} name="basic" className="x-form">
<Form.Item label="切换维度">
{getFieldDecorator('switchMode', {
initialValue: 'kafkaUser',
rules: [{
required: true,
message: '请选择切换维度',
}],
})(<Radio.Group onChange={this.onModeChange}>
<Radio value="kafkaUser">kafkaUser</Radio>
<Radio value="clientID">kafkaUser + clientID</Radio>
</Radio.Group>)}
</Form.Item>
{/* <Form.Item label="规则" >
{getFieldDecorator('rule', {
initialValue: 'spec',
rules: [{
required: true,
message: '请选择规则',
}],
})(<Radio.Group onChange={this.handleRadioChange} >
<Radio value="all">应用于所有Topic</Radio>
<Radio value="spec">应用于特定Topic</Radio>
</Radio.Group>)}
</Form.Item> */}
{switchMode === 'clientID' && <div style={{ margin: '-10px 0 10px 0' }} >
<Tooltip placement="bottomLeft" title={rulesNode}>
<a></a>
</Tooltip>
</div>}
{radioCheck === 'spec' ? <Form.Item className="no-label" label="" >
{getFieldDecorator('targetKeys', {
initialValue: targetKeys,
rules: [{
required: false,
message: '请选择Topic',
}],
})(
<TransferTable
selectedKeys={selectedKeys}
topicChange={this.handleTopicChange}
onDirectChange={this.onDirectChange}
columns={columns}
dataSource={topics}
currentCluster={currentCluster}
getManualSelected={this.getManualSelected}
/>,
)}
</Form.Item> : null}
</Form>
{radioCheck === 'spec' ?
<>
<Table
className="modal-table-content"
columns={selectTableColumn}
dataSource={tableData}
size="small"
rowKey="kafkaUser"
pagination={false}
scroll={{ y: 300 }}
/>
{tableData.length ? <div onClick={this.downloadData} className="modal-table-download"><a></a></div> : null}
</>
: null}
</Spin>
</Modal>
);
}
}
export const TopicSwitchWrapper = Form.create<IXFormProps>()(TopicHaSwitch);
const TableTransfer = ({ leftColumns, ...restProps }: any) => (
export const TableTransfer = ({ leftColumns, getManualSelected, tableAttrs, ...restProps }: any) => (
<Transfer {...restProps} showSelectAll={true}>
{({
filteredItems,
@@ -651,6 +775,7 @@ const TableTransfer = ({ leftColumns, ...restProps }: any) => (
disabled: item.disabled,
}),
onSelect({ key }: any, selected: any) {
getManualSelected(true, key, selected);
onItemSelect(key, selected);
},
selectedRowKeys: listSelectedKeys,
@@ -668,9 +793,11 @@ const TableTransfer = ({ leftColumns, ...restProps }: any) => (
onRow={({ key, disabled }) => ({
onClick: () => {
if (disabled) return;
getManualSelected(true, key, listSelectedKeys.includes(key));
onItemSelect(key, !listSelectedKeys.includes(key));
},
})}
{...tableAttrs}
/>
);
}}
@@ -683,8 +810,12 @@ interface IProps {
onDirectChange?: any;
currentCluster: any;
topicChange: any;
columns: any[];
dataSource: any[];
selectedKeys: string[];
getManualSelected: any;
transferAttrs?: any;
tableAttrs?: any;
}
export class TransferTable extends React.Component<IProps> {
@@ -695,7 +826,7 @@ export class TransferTable extends React.Component<IProps> {
}
public render() {
const { currentCluster, dataSource, value, topicChange, selectedKeys } = this.props;
const { currentCluster, columns, dataSource, value, topicChange, selectedKeys, getManualSelected, transferAttrs, tableAttrs } = this.props;
return (
<div>
<TableTransfer
@@ -705,12 +836,16 @@ export class TransferTable extends React.Component<IProps> {
showSearch={true}
onChange={this.onChange}
onSelectChange={topicChange}
filterOption={(inputValue: string, item: any) => item.topicName?.indexOf(inputValue) > -1}
leftColumns={columns}
titles={[`集群${currentCluster.clusterName}`, `集群${currentCluster.haClusterVO.clusterName}`]}
titles={[`集群${currentCluster.clusterName}`, `集群${currentCluster.haClusterVO?.clusterName}`]}
locale={{
itemUnit: '',
itemsUnit: '',
}}
getManualSelected={getManualSelected}
tableAttrs={tableAttrs}
{...transferAttrs}
/>
</div>
);

View File

@@ -0,0 +1,8 @@
.ant-table-wrapper.no-lr-padding {
padding-left: 0!important;
padding-right: 0!important;
}
.no-table-header .ant-table-header {
display: none;
}

View File

@@ -6,6 +6,8 @@ import { Table, Tooltip } from 'component/antd';
import { SearchAndFilterContainer } from 'container/search-filter';
import Url from 'lib/url-parser';
import { pagination, cellStyle } from 'constants/table';
import moment = require('moment');
import { timeFormat } from 'constants/strategy';
@observer
export class ConnectInformation extends SearchAndFilterContainer {
@@ -27,44 +29,70 @@ export class ConnectInformation extends SearchAndFilterContainer {
title: '客户端类型',
dataIndex: 'clientType',
key: 'clientType',
width: '20%',
width: 130,
filters: [{ text: '消费', value: 'consumer' }, { text: '生产', value: 'produce' }],
onFilter: (value: string, record: IConnectionInfo) => record.clientType.indexOf(value) === 0,
render: (t: string) =>
<span>{t === 'consumer' ? '消费' : '生产'}</span>,
}, this.renderColumnsFilter('filterVisible'));
const columns = [{
title: 'AppID',
dataIndex: 'appId',
key: 'appId',
width: '20%',
sorter: (a: IConnectionInfo, b: IConnectionInfo) => a.appId.charCodeAt(0) - b.appId.charCodeAt(0),
},
{
title: '主机名',
dataIndex: 'hostname',
key: 'hostname',
width: '40%',
onCell: () => ({
style: {
maxWidth: 250,
...cellStyle,
},
}),
render: (t: string) => {
return (
<Tooltip placement="bottomLeft" title={t} >{t}</Tooltip>
);
const columns = [
{
title: 'AppID',
dataIndex: 'appId',
key: 'appId',
width: '30%',
sorter: (a: IConnectionInfo, b: IConnectionInfo) => a.appId.charCodeAt(0) - b.appId.charCodeAt(0),
},
{
title: 'clientID',
dataIndex: 'clientId',
key: 'clientId',
width: '30%',
onCell: () => ({
style: {
maxWidth: 250,
...cellStyle,
},
}),
render: (t: string) => {
return (
<Tooltip placement="bottomLeft" title={t} >{t}</Tooltip>
);
},
},
{
title: '主机名',
dataIndex: 'hostname',
key: 'hostname',
width: '30%',
onCell: () => ({
style: {
maxWidth: 250,
...cellStyle,
},
}),
render: (t: string) => {
return (
<Tooltip placement="bottomLeft" title={t} >{t}</Tooltip>
);
},
},
{
title: '客户端版本',
dataIndex: 'clientVersion',
key: 'clientVersion',
width: 130,
},
},
{
title: '客户端版本',
dataIndex: 'clientVersion',
key: 'clientVersion',
width: '20%',
},
clientType,
{
title: '最后访问时间',
dataIndex: 'realConnectTime',
key: 'realConnectTime',
width: 170,
render: (t: number) => moment(t).format(timeFormat),
sorter: (a: IConnectionInfo, b: IConnectionInfo) => a.realConnectTime - b.realConnectTime,
},
];
if (connectInfo) {
return (

View File

@@ -75,6 +75,8 @@ export interface IConnectionInfo {
hostname: string;
ip: string;
topicName: string;
clientId?: string;
realConnectTime?: number;
key?: number;
}

View File

@@ -130,7 +130,11 @@ module.exports = {
historyApiFallback: true,
proxy: {
'/api/v1/': {
target: 'http://127.0.0.1:8080/',
// target: 'http://117.51.150.133:8080/',
target: 'http://10.190.55.249:8080/',
// target: 'http://10.179.37.199:8008',
// target: 'http://10.179.148.210:8080',
// target: 'http://99.11.45.164:8888',
changeOrigin: true,
}
},

View File

@@ -4,13 +4,15 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
import java.util.List;
import java.util.Set;
/**
* Ha App管理
*/
public interface HaAppManager {
Result<List<AppRelateTopicsVO>> appRelateTopics(Long clusterPhyId, List<String> filterTopicNameList);
Result<List<AppRelateTopicsVO>> appRelateTopics(Boolean ha, Long clusterPhyId, List<String> filterTopicNameList);
Result<List<AppRelateTopicsVO>> appAndClientRelateTopics(Long clusterPhyId, Set<String> filterTopicNameSet);
boolean isContainAllRelateAppTopics(Long clusterPhyId, List<String> filterTopicNameList);
}

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.kafka.manager.service.biz.ha;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
@@ -37,6 +38,7 @@ public interface HaTopicManager {
Result<HaSwitchTopic> switchHaWithCanRetry(Long newActiveClusterPhyId,
Long newStandbyClusterPhyId,
List<String> switchTopicNameList,
List<KafkaUserAndClientDTO> kafkaUserAndClientIdList,
boolean focus,
boolean firstTriggerExecute,
JobLogDO switchLogTemplate,

View File

@@ -1,15 +1,25 @@
package com.xiaojukeji.kafka.manager.service.biz.ha.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
import com.xiaojukeji.kafka.manager.common.utils.FutureUtil;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager;
import com.xiaojukeji.kafka.manager.service.service.ConfigService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService;
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@@ -22,17 +32,45 @@ public class HaAppManagerImpl implements HaAppManager {
@Autowired
private HaASRelationService haASRelationService;
@Autowired
private TopicConnectionService topicConnectionService;
@Autowired
private ConfigService configService;
@Autowired
private TopicManagerService topicManagerService;
private static final FutureUtil<Result<List<AppRelateTopicsVO>>> ConnectionsSearchTP = FutureUtil.init(
"ConnectionsSearchTP",
5,
5,
500
);
@Override
public Result<List<AppRelateTopicsVO>> appRelateTopics(Long clusterPhyId, List<String> filterTopicNameList) {
public Result<List<AppRelateTopicsVO>> appRelateTopics(Boolean ha, Long clusterPhyId, List<String> filterTopicNameList) {
// 获取关联的Topic列表
Map<String, Set<String>> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList);
Map<String, Set<String>> appClientSetMap = haASRelationService.listAllHAClient(clusterPhyId, userTopicMap.keySet());
// 获取集群已建立HA的Topic列表
Set<String> haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC)
.stream()
.map(elem -> elem.getActiveResName())
.collect(Collectors.toSet());
Set<String> topicNameSet = null;
if (ha) {
topicNameSet = haTopicNameSet;
}else {
List<TopicDO> topicDOS = topicManagerService.getByClusterId(clusterPhyId);
topicNameSet = topicDOS.stream()
.filter(topicBizPO -> !haTopicNameSet.contains(topicBizPO.getTopicName()))
.map(TopicDO::getTopicName).collect(Collectors.toSet());
}
Set<String> filterTopicNameSet = new HashSet<>(filterTopicNameList);
List<AppRelateTopicsVO> voList = new ArrayList<>();
@@ -40,16 +78,18 @@ public class HaAppManagerImpl implements HaAppManager {
AppRelateTopicsVO vo = new AppRelateTopicsVO();
vo.setClusterPhyId(clusterPhyId);
vo.setKafkaUser(entry.getKey());
vo.setHaClientIdList(new ArrayList<>(appClientSetMap.getOrDefault(entry.getKey(), new HashSet<>())));
vo.setSelectedTopicNameList(new ArrayList<>());
vo.setNotSelectTopicNameList(new ArrayList<>());
vo.setNotHaTopicNameList(new ArrayList<>());
Set<String> finalTopicNameSet = topicNameSet;
entry.getValue().forEach(elem -> {
if (elem.startsWith("__")) {
// ignore
return;
}
if (!haTopicNameSet.contains(elem)) {
if (!finalTopicNameSet.contains(elem)) {
vo.getNotHaTopicNameList().add(elem);
} else if (filterTopicNameSet.contains(elem)) {
vo.getSelectedTopicNameList().add(elem);
@@ -64,6 +104,104 @@ public class HaAppManagerImpl implements HaAppManager {
return Result.buildSuc(voList);
}
@Override
public Result<List<AppRelateTopicsVO>> appAndClientRelateTopics(Long clusterPhyId, Set<String> filterTopicNameSet) {
List<HaASRelationDO> haASRelationDOList = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.CLUSTER);
Long secondClusterId = null;
for (HaASRelationDO asRelationDO: haASRelationDOList) {
if (clusterPhyId.equals(asRelationDO.getActiveClusterPhyId())) {
secondClusterId = asRelationDO.getStandbyClusterPhyId();
} else {
secondClusterId = asRelationDO.getActiveClusterPhyId();
}
break;
}
Map<String/*TopicName*/, Result<Map<String/*KafkaUser*/, Set<String>/*ClientID*/>>> connectionsResultMap = new ConcurrentHashMap<>();
// 生效时间
Long activeMin = configService.getLongValue(ConfigConstant.HA_CONNECTION_ACTIVE_TIME_UNIT_MIN, Constant.TOPIC_CONNECTION_LATEST_TIME_MS / 1000 / 60);
// 获取Topic关联的连接
for (String topicName: filterTopicNameSet) {
Long tempSecondClusterId = secondClusterId;
ConnectionsSearchTP.runnableTask(
String.format("clusterPhyId=%d||topicName=%s", clusterPhyId, topicName),
10000,
() -> {
Result<Map<String, Set<String>>> userAndClientMapResult = topicConnectionService.getHaKafkaUserAndClientIdByTopicName(
clusterPhyId,
tempSecondClusterId,
topicName,
new Date(System.currentTimeMillis() - activeMin * 60L * 1000L),
new Date()
);
connectionsResultMap.put(topicName, userAndClientMapResult);
}
);
ConnectionsSearchTP.waitExecute(10000);
}
// 因为接口比较重要,只要一出现异常,则直接返回错误
for (Result<Map<String, Set<String>>> valueResult: connectionsResultMap.values()) {
if (valueResult.failed()) {
return Result.buildFromIgnoreData(valueResult);
}
}
// 查询结果转Map
Map<String/*KafkaUser*/, Set<String>/*ClientID*/> kafkaUserAndClientMap = new HashMap<>();
for (Result<Map<String, Set<String>>> valueResult: connectionsResultMap.values()) {
for (Map.Entry<String, Set<String>> entry: valueResult.getData().entrySet()) {
kafkaUserAndClientMap.putIfAbsent(entry.getKey(), new HashSet<>());
kafkaUserAndClientMap.get(entry.getKey()).addAll(entry.getValue());
}
}
// 获取集群已建立HA的Topic列表
Set<String> haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC)
.stream()
.map(elem -> elem.getActiveResName())
.collect(Collectors.toSet());
// 获取KafkaUser+Client下的Topic列表
List<AppRelateTopicsVO> voList = new ArrayList<>();
for (Map.Entry<String, Set<String>> entry: kafkaUserAndClientMap.entrySet()) {
Long tempSecondClusterId = secondClusterId;
ConnectionsSearchTP.runnableTask(
"",
10000,
() -> {
Result<List<TopicConnectionDO>> doListResult = topicConnectionService.getByClusterAndAppId(
clusterPhyId,
tempSecondClusterId,
entry.getKey(),
new Date(System.currentTimeMillis() - activeMin * 60L * 1000L),
new Date()
);
if (doListResult.failed()) {
return Result.buildFromIgnoreData(doListResult);
}
return Result.buildSuc(convert2VOList(clusterPhyId, entry.getValue(), doListResult.getData(), haTopicNameSet, filterTopicNameSet));
}
);
for (Result<List<AppRelateTopicsVO>> elem: ConnectionsSearchTP.waitResult(10000)) {
if (elem.failed()) {
Result.buildFromIgnoreData(elem);
}
voList.addAll(elem.getData());
}
}
return Result.buildSuc(voList);
}
@Override
public boolean isContainAllRelateAppTopics(Long clusterPhyId, List<String> filterTopicNameList) {
Map<String, Set<String>> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList);
@@ -91,4 +229,41 @@ public class HaAppManagerImpl implements HaAppManager {
return userTopicMap;
}
private List<AppRelateTopicsVO> convert2VOList(Long clusterPhyId,
Set<String> clientIdSet,
List<TopicConnectionDO> connectionList,
Set<String> haTopicNameSet,
Set<String> filterTopicNameSet) {
Map<String/*clientID*/, AppRelateTopicsVO> voMap = new HashMap<>();
for (TopicConnectionDO connection: connectionList) {
if (connection.getTopicName().startsWith("__")) {
// 忽略系统内部Topic
continue;
}
if (!clientIdSet.contains("") && !clientIdSet.contains(connection.getClientId())) {
continue;
}
AppRelateTopicsVO vo = voMap.get(connection.getClientId());
if (vo == null) {
vo = new AppRelateTopicsVO(clusterPhyId, connection.getAppId(), connection.getClientId());
}
if (!haTopicNameSet.contains(connection.getTopicName())) {
vo.addNotHaIfNotExist(connection.getTopicName());
}
if (!filterTopicNameSet.contains(connection.getTopicName())) {
vo.addNotSelectedIfNotExist(connection.getTopicName());
} else {
vo.addSelectedIfNotExist(connection.getTopicName());
}
voMap.put(connection.getClientId(), vo);
}
return new ArrayList<>(voMap.values());
}
}

View File

@@ -7,6 +7,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
@@ -14,13 +15,14 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
import com.xiaojukeji.kafka.manager.common.utils.HAUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.AdminService;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.JobLogService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
import com.xiaojukeji.kafka.manager.service.service.ha.HaKafkaUserService;
import com.xiaojukeji.kafka.manager.service.service.ha.HaTopicService;
@@ -28,6 +30,7 @@ import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Component;
import java.util.*;
@@ -40,9 +43,6 @@ public class HaTopicManagerImpl implements HaTopicManager {
@Autowired
private ClusterService clusterService;
@Autowired
private AuthorityService authorityService;
@Autowired
private HaTopicService haTopicService;
@@ -61,10 +61,14 @@ public class HaTopicManagerImpl implements HaTopicManager {
@Autowired
private JobLogService jobLogService;
@Autowired
private AdminService adminService;
@Override
public Result<HaSwitchTopic> switchHaWithCanRetry(Long newActiveClusterPhyId,
Long newStandbyClusterPhyId,
List<String> switchTopicNameList,
List<KafkaUserAndClientDTO> kafkaUserAndClientIdList,
boolean focus,
boolean firstTriggerExecute,
JobLogDO switchLogTemplate,
@@ -106,7 +110,7 @@ public class HaTopicManagerImpl implements HaTopicManager {
}
// 4、进行切换预处理
HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, focus, switchLogTemplate);
HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, kafkaUserAndClientIdList, focus, switchLogTemplate);
// 5、直接等待10秒使得相关数据有机会同步完成
BackoffUtils.backoff(10000);
@@ -125,7 +129,15 @@ public class HaTopicManagerImpl implements HaTopicManager {
switchTopic.addHaSwitchTopic(this.newStandbyTopicAddFetchConfig(newActiveClusterPhyDO, newStandbyClusterPhyDO, doList, focus, switchLogTemplate, operator));
// 9、进行切换收尾
switchTopic.addHaSwitchTopic(this.closeoutSwitching(newActiveClusterPhyDO, newStandbyClusterPhyDO, configUtils.getDKafkaGatewayZK(), doList, focus, switchLogTemplate));
switchTopic.addHaSwitchTopic(this.closeoutSwitching(
newActiveClusterPhyDO,
newStandbyClusterPhyDO,
configUtils.getDKafkaGatewayZK(),
doList,
kafkaUserAndClientIdList,
focus,
switchLogTemplate
));
// 10、状态结果汇总记录
doList.forEach(elem -> switchTopic.addActiveTopicStatus(elem.getActiveResName(), elem.getStatus()));
@@ -136,6 +148,18 @@ public class HaTopicManagerImpl implements HaTopicManager {
newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(switchTopicNameList), switchTopic, operator
);
if (switchTopic.isFinished()) {
// 全都切换完成则更新HA信息
try {
updateHAClient(newActiveClusterPhyId, newStandbyClusterPhyId, kafkaUserAndClientIdList);
} catch (Exception e) {
LOGGER.error(
"method=switchHaWithCanRetry||newActiveClusterPhyId={}||newStandbyClusterPhyId={}||kafkaUserAndClientIdList={}||operator={}||errMsg=exception",
newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(kafkaUserAndClientIdList), operator, e
);
}
}
return Result.buildSuc(switchTopic);
}
@@ -188,6 +212,20 @@ public class HaTopicManagerImpl implements HaTopicManager {
}
Result<Void> rv = haTopicService.deleteHA(relationDO.getActiveClusterPhyId(), relationDO.getStandbyClusterPhyId(), topicName, operator);
//删除备topic资源
if (dto.getRetainStandbyResource() != null && !dto.getRetainStandbyResource()) {
ResultStatus statusEnum = adminService.deleteTopic(
PhysicalClusterMetadataManager.getClusterFromCache(dto.getStandbyClusterId()),
topicName,
operator);
if (statusEnum.getCode() != ResultStatus.SUCCESS.getCode()){
LOGGER.error(
"method=batchRemoveHaTopic||activeClusterPhyId={}||standbyClusterPhyId={}||topicName={}||result={}||msg=delete standby topic failed.",
dto.getActiveClusterId(), dto.getStandbyClusterId(), topicName, statusEnum
);
}
}
operationResultList.add(TopicOperationResult.buildFrom(dto.getActiveClusterId(), topicName, rv));
}
@@ -200,58 +238,43 @@ public class HaTopicManagerImpl implements HaTopicManager {
jobLogService.addLogAndIgnoreException(switchLogTemplate.setAndCopyNew(new Date(), content));
}
/**
* 切换预处理
* 1、在主集群上将Topic关联的KafkaUser的active集群设置为None
*/
private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO, List<HaASRelationDO> doList, boolean focus, JobLogDO switchLogTemplate) {
// 暂停HA的KafkaUser
Set<String> stoppedHaKafkaUserSet = new HashSet<>();
private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO,
List<HaASRelationDO> doList,
List<KafkaUserAndClientDTO> clientDTOList,
boolean focus,
JobLogDO switchLogTemplate) {
HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
boolean allSuccess = true; // 所有都成功
boolean needLog = false; // 需要记录日志
for (HaASRelationDO relationDO: doList) {
if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)) {
// 当前不处于prepare状态
haSwitchTopic.setFinished(true);
continue;
}
needLog = true;
// 获取关联的KafkaUser
Set<String> relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName())
.stream()
.map(elem -> elem.getAppId())
.filter(kafkaUser -> !stoppedHaKafkaUserSet.contains(kafkaUser))
.collect(Collectors.toSet());
// 暂停kafkaUser HA
for (String kafkaUser: relatedKafkaUserSet) {
Result<Void> rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), kafkaUser);
if (rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv.failed() && focus) {
allSuccess = false;
}
}
// 记录操作过的user
stoppedHaKafkaUserSet.addAll(relatedKafkaUserSet);
// 修改Topic主备状态
relationDO.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
// 存在prepare状态的则就需要进行预处理操作
boolean needDOIt = doList.stream().filter(elem -> elem.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)).count() > 0;
if (!needDOIt) {
// 不需要做
return haSwitchTopic;
}
if (needLog) {
this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
// 暂停kafkaUser HA
for (KafkaUserAndClientDTO dto: clientDTOList) {
Result<Void> rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId()));
if (rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv.failed() && focus) {
allSuccess = false;
}
}
// 修改Topic主备状态
doList.forEach(elem -> {
elem.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
haASRelationService.updateRelationStatus(elem.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
});
this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
haSwitchTopic.setFinished(true);
return haSwitchTopic;
}
@@ -412,87 +435,76 @@ public class HaTopicManagerImpl implements HaTopicManager {
* 2、原先的备集群-修改user的active集群指向新的主集群
* 3、网关-修改user的active集群指向新的主集群
*/
private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO, ClusterDO newStandbyClusterPhyDO, String gatewayZK, List<HaASRelationDO> doList, boolean focus, JobLogDO switchLogTemplate) {
// 暂停HA的KafkaUser
Set<String> activeHaKafkaUserSet = new HashSet<>();
private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO,
ClusterDO newStandbyClusterPhyDO,
String gatewayZK,
List<HaASRelationDO> doList,
List<KafkaUserAndClientDTO> kafkaUserAndClientDTOList,
boolean focus,
JobLogDO switchLogTemplate) {
HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
boolean needDOIt = doList.stream().filter(elem -> elem.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)).count() > 0;
if (!needDOIt) {
// 不需要做任何事情
return haSwitchTopic;
}
boolean allSuccess = true;
boolean needLog = false;
boolean forceAndNewStandbyFailed = false; // 强制切换,但是新的备依旧操作失败
HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
for (HaASRelationDO relationDO: doList) {
if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)) {
// 当前不处于closeout状态
for (KafkaUserAndClientDTO dto: kafkaUserAndClientDTOList) {
String zkNodeName = HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId());
// 操作新的主集群
Result<Void> rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), zkNodeName);
if (rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
continue;
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv.failed() && focus) {
allSuccess = false;
}
needLog = true;
// 获取关联的KafkaUser
Set<String> relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName())
.stream()
.map(elem -> elem.getAppId())
.filter(kafkaUser -> !activeHaKafkaUserSet.contains(kafkaUser))
.collect(Collectors.toSet());
for (String kafkaUser: relatedKafkaUserSet) {
// 操作新的主集群
Result<Void> rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser);
if (rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv.failed() && focus) {
allSuccess = false;
}
// 操作新的备集群如果出现错误则下次就不再进行操作ZK。新的备的Topic不是那么重要因此这里允许出现跳过
rv = null;
if (!forceAndNewStandbyFailed) {
// 如果对备集群的操作过程中,出现了失败,则直接跳过
rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser);
}
if (rv != null && rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv != null && rv.failed() && focus) {
allSuccess = false;
forceAndNewStandbyFailed = true;
}
// 操作网关
rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), kafkaUser);
if (rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv.failed() && focus) {
allSuccess = false;
}
// 操作新的备集群如果出现错误则下次就不再进行操作ZK。新的备的Topic不是那么重要因此这里允许出现跳过
rv = null;
if (!forceAndNewStandbyFailed) {
// 如果对备集群的操作过程中,出现了失败,则直接跳过
rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), zkNodeName);
}
// 记录已经激活的User
activeHaKafkaUserSet.addAll(relatedKafkaUserSet);
if (rv != null && rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv != null && rv.failed() && focus) {
allSuccess = false;
forceAndNewStandbyFailed = true;
}
// 修改Topic主备信息
// 操作网关
rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), zkNodeName);
if (rv.failed() && !focus) {
haSwitchTopic.setFinished(false);
this.saveLogs(switchLogTemplate, String.format("%s:\t失败1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
return haSwitchTopic;
} else if (rv.failed() && focus) {
allSuccess = false;
}
}
// 修改Topic主备信息
doList.forEach(elem -> {
HaASRelationDO newHaASRelationDO = new HaASRelationDO(
newActiveClusterPhyDO.getId(), relationDO.getActiveResName(),
newStandbyClusterPhyDO.getId(), relationDO.getStandbyResName(),
newActiveClusterPhyDO.getId(), elem.getActiveResName(),
newStandbyClusterPhyDO.getId(), elem.getStandbyResName(),
HaResTypeEnum.TOPIC.getCode(),
HaStatusEnum.STABLE_CODE
);
newHaASRelationDO.setId(relationDO.getId());
newHaASRelationDO.setId(elem.getId());
haASRelationService.updateById(newHaASRelationDO);
}
if (!needLog) {
return haSwitchTopic;
}
});
this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
return haSwitchTopic;
@@ -556,4 +568,45 @@ public class HaTopicManagerImpl implements HaTopicManager {
return Result.buildSuc(relationDO);
}
private void updateHAClient(Long newActiveClusterPhyId,
Long newStandbyClusterPhyId,
List<KafkaUserAndClientDTO> kafkaUserAndClientIdList) {
if (ValidateUtils.isEmptyList(kafkaUserAndClientIdList)) {
return;
}
List<HaASRelationDO> doList = haASRelationService.listAllHAFromDB(newActiveClusterPhyId, HaResTypeEnum.KAFKA_USER_AND_CLIENT);
Map<String, HaASRelationDO> resNameMap = new HashMap<>();
doList.forEach(elem -> resNameMap.put(elem.getActiveResName(), elem));
for (KafkaUserAndClientDTO dto: kafkaUserAndClientIdList) {
if (ValidateUtils.isBlank(dto.getClientId())) {
continue;
}
String resName = HAUtils.mergeKafkaUserAndClient(dto.getKafkaUser(), dto.getClientId());
HaASRelationDO newDO = new HaASRelationDO(
newActiveClusterPhyId,
resName,
newStandbyClusterPhyId,
resName,
HaResTypeEnum.KAFKA_USER_AND_CLIENT.getCode(),
HaStatusEnum.STABLE_CODE
);
HaASRelationDO oldDO = resNameMap.remove(resName);
if (oldDO != null) {
newDO.setId(oldDO.getId());
haASRelationService.updateById(newDO);
} else {
try {
haASRelationService.addHAToDB(newDO);
} catch (DuplicateKeyException dke) {
// ignore
}
}
}
}
}

View File

@@ -22,10 +22,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.ha.job.HaJobDetailVO;
import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
import com.xiaojukeji.kafka.manager.common.utils.FutureUtil;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.*;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager;
import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager;
@@ -95,19 +92,20 @@ public class HaASSwitchJobManagerImpl implements HaASSwitchJobManager {
LOGGER.info("method=createJob||activeClusterPhyId={}||switchTopics={}||operator={}", dto.getActiveClusterPhyId(), ConvertUtil.obj2Json(haTopicSetResult.getData()), operator);
// 2、查看是否将KafkaUser关联的Topic都涵盖了
if (dto.getMustContainAllKafkaUserTopics() != null
&& dto.getMustContainAllKafkaUserTopics()
&& (dto.getAll() == null || !dto.getAll())
&& !haAppManager.isContainAllRelateAppTopics(dto.getActiveClusterPhyId(), dto.getTopicNameList())) {
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "存在KafkaUser关联的Topic未选中");
}
// // 2、查看是否将KafkaUser关联的Topic都涵盖了
// if (dto.getMustContainAllKafkaUserTopics() != null
// && dto.getMustContainAllKafkaUserTopics()
// && (dto.getAll() == null || !dto.getAll())
// && !haAppManager.isContainAllRelateAppTopics(dto.getActiveClusterPhyId(), dto.getTopicNameList())) {
// return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "存在KafkaUser关联的Topic未选中");
// }
// 3、创建任务
Result<Long> longResult = haASSwitchJobService.createJob(
dto.getActiveClusterPhyId(),
dto.getStandbyClusterPhyId(),
new ArrayList<>(haTopicSetResult.getData()),
dto.getKafkaUserAndClientIdList(),
operator
);
if (longResult.failed()) {
@@ -176,6 +174,7 @@ public class HaASSwitchJobManagerImpl implements HaASSwitchJobManager {
jobDO.getActiveClusterPhyId(),
jobDO.getStandbyClusterPhyId(),
subJobDOList.stream().map(elem -> elem.getActiveResName()).collect(Collectors.toList()),
jobDO.getExtendRawData(),
focus,
firstTriggerExecute,
new JobLogDO(JobLogBizTypEnum.HA_SWITCH_JOB_LOG.getCode(), String.valueOf(jobId)),

View File

@@ -1,10 +1,13 @@
package com.xiaojukeji.kafka.manager.service.service.gateway;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* @author zhongyuankai
@@ -21,6 +24,14 @@ public interface TopicConnectionService {
Date startTime,
Date endTime);
Result<Map<String/*KafkaUser*/, Set<String>/*ClientID*/>> getHaKafkaUserAndClientIdByTopicName(Long firstClusterId,
Long secondClusterId,
String topicName,
Date startTime,
Date endTime);
Set<String> getKafkaUserAndClientIdTopicNames(Set<Long> clusterIdSet, String kafkaUser, String clientId, Date startTime, Date endTime);
/**
* 查询连接信息
*/
@@ -37,6 +48,8 @@ public interface TopicConnectionService {
Date startTime,
Date endTime);
Result<List<TopicConnectionDO>> getByClusterAndAppId(Long firstClusterId, Long secondClusterId, String appId, Date startTime, Date endTime);
/**
* 判断topic是否存在连接
*/

View File

@@ -1,6 +1,8 @@
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
@@ -67,6 +69,71 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
return getByTopicName(clusterId, doList);
}
@Override
public Result<Map<String, Set<String>>> getHaKafkaUserAndClientIdByTopicName(Long firstClusterId,
Long secondClusterId,
String topicName,
Date startTime,
Date endTime) {
List<TopicConnectionDO> doList = new ArrayList<>();
try {
if (firstClusterId != null) {
doList.addAll(topicConnectionDao.getByTopicName(firstClusterId, topicName, startTime, endTime));
}
} catch (Exception e) {
LOGGER.error("get topic connections failed, firstClusterId:{} topicName:{}.", firstClusterId, topicName, e);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
}
try {
if (secondClusterId != null) {
doList.addAll(topicConnectionDao.getByTopicName(secondClusterId, topicName, startTime, endTime));
}
} catch (Exception e) {
LOGGER.error("get topic connections failed, secondClusterId:{} topicName:{}.", secondClusterId, topicName, e);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
}
if (ValidateUtils.isEmptyList(doList)) {
return Result.buildSuc(new HashMap<>());
}
Map<String, Set<String>> userAndClientMap = new HashMap<>();
for (TopicConnectionDO po: doList) {
if (!po.getClientId().startsWith("P#") && !po.getClientId().startsWith("C#")) {
// 忽略非HA的clientId
continue;
}
userAndClientMap.putIfAbsent(po.getAppId(), new HashSet<>());
userAndClientMap.get(po.getAppId()).add(po.getClientId());
}
return Result.buildSuc(userAndClientMap);
}
@Override
public Set<String> getKafkaUserAndClientIdTopicNames(Set<Long> clusterIdSet, String kafkaUser, String clientId, Date startTime, Date endTime) {
List<TopicConnectionDO> doList = null;
try {
doList = topicConnectionDao.getByAppId(kafkaUser, startTime, endTime);
} catch (Exception e) {
LOGGER.error("get topic connections failed, kafkaUser:{}.", kafkaUser, e);
}
if (ValidateUtils.isEmptyList(doList)) {
return new HashSet<>();
}
return doList
.stream()
.filter(elem -> elem.getClientId().equals(clientId) && clusterIdSet.contains(elem.getClusterId()))
.map(item -> item.getTopicName())
.collect(Collectors.toSet());
}
@Override
public List<TopicConnection> getByTopicName(Long clusterId,
String topicName,
@@ -102,6 +169,36 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
return getByTopicName(null, doList);
}
@Override
public Result<List<TopicConnectionDO>> getByClusterAndAppId(Long firstClusterId, Long secondClusterId, String appId, Date startTime, Date endTime) {
List<TopicConnectionDO> doList = new ArrayList<>();
try {
if (firstClusterId != null) {
doList.addAll(topicConnectionDao.getByClusterAndAppId(firstClusterId, appId, startTime, endTime));
}
} catch (Exception e) {
LOGGER.error("get topic connections failed, firstClusterId:{} appId:{}.", firstClusterId, appId, e);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
}
try {
if (secondClusterId != null) {
doList.addAll(topicConnectionDao.getByClusterAndAppId(secondClusterId, appId, startTime, endTime));
}
} catch (Exception e) {
LOGGER.error("get topic connections failed, secondClusterId:{} appId:{}.", secondClusterId, appId, e);
return Result.buildFromRSAndMsg(ResultStatus.MYSQL_ERROR, e.getMessage());
}
if (ValidateUtils.isEmptyList(doList)) {
return Result.buildSuc(new ArrayList<>());
}
return Result.buildSuc(doList);
}
@Override
public boolean isExistConnection(Long clusterId,
String topicName,
@@ -210,6 +307,10 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
LOGGER.error("get hostname failed. ip:{}.", connectionDO.getIp(), e);
}
dto.setHostname(hostName.replace(KafkaConstant.BROKER_HOST_NAME_SUFFIX, ""));
dto.setClientId(connectionDO.getClientId());
dto.setRealConnectTime(connectionDO.getRealConnectTime());
dto.setCreateTime(connectionDO.getCreateTime().getTime());
return dto;
}

View File

@@ -5,6 +5,8 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface HaASRelationService {
Result<Void> replaceTopicRelationsToDB(Long standbyClusterPhyId, List<HaASRelationDO> topicRelationDOList);
@@ -53,6 +55,8 @@ public interface HaASRelationService {
*/
List<HaASRelationDO> listAllHAFromDB(Long firstClusterPhyId, HaResTypeEnum resTypeEnum);
Map<String, Set<String>> listAllHAClient(Long firstClusterPhyId, Set<String> kafkaUserSet);
/**
* 获取主备关系
*/

View File

@@ -4,6 +4,7 @@ package com.xiaojukeji.kafka.manager.service.service.ha;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaJobDetail;
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaSubJobExtendData;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO;
@@ -14,7 +15,11 @@ public interface HaASSwitchJobService {
/**
* 创建任务
*/
Result<Long> createJob(Long activeClusterPhyId, Long standbyClusterPhyId, List<String> topicNameList, String operator);
Result<Long> createJob(Long activeClusterPhyId,
Long standbyClusterPhyId,
List<String> topicNameList,
List<KafkaUserAndClientDTO> kafkaUserAndClientList,
String operator);
/**
* 更新任务状态

View File

@@ -6,6 +6,8 @@ import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaStatusEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.utils.HAUtils;
import com.xiaojukeji.kafka.manager.common.utils.Tuple;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.ha.HaASRelationDao;
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
@@ -14,9 +16,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -177,6 +177,34 @@ public class HaASRelationServiceImpl implements HaASRelationService {
return doList;
}
@Override
public Map<String, Set<String>> listAllHAClient(Long firstClusterPhyId, Set<String> kafkaUserSet) {
LambdaQueryWrapper<HaASRelationDO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(HaASRelationDO::getResType, HaResTypeEnum.KAFKA_USER_AND_CLIENT.getCode());
lambdaQueryWrapper.and(lambda ->
lambda.eq(HaASRelationDO::getActiveClusterPhyId, firstClusterPhyId).or().eq(HaASRelationDO::getStandbyClusterPhyId, firstClusterPhyId)
);
// 查询HA列表
List<HaASRelationDO> doList = haASRelationDao.selectList(lambdaQueryWrapper);
if (ValidateUtils.isNull(doList)) {
return new HashMap<>();
}
Map<String, Set<String>> haClientMap = new HashMap<>();
doList.forEach(elem -> {
Tuple<String, String> data = HAUtils.splitKafkaUserAndClient(elem.getActiveResName());
if (data == null || !kafkaUserSet.contains(data.getV1())) {
return;
}
haClientMap.putIfAbsent(data.getV1(), new HashSet<>());
haClientMap.get(data.getV1()).add(data.getV2());
});
return haClientMap;
}
@Override
public List<HaASRelationDO> listAllHAFromDB(Long firstClusterPhyId, Long secondClusterPhyId, HaResTypeEnum resTypeEnum) {
// 查询HA列表

View File

@@ -6,6 +6,7 @@ import com.xiaojukeji.kafka.manager.common.bizenum.ha.job.HaJobStatusEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.KafkaUserAndClientDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchSubJobDO;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
@@ -35,10 +36,22 @@ public class HaASSwitchJobServiceImpl implements HaASSwitchJobService {
@Override
@Transactional
public Result<Long> createJob(Long activeClusterPhyId, Long standbyClusterPhyId, List<String> topicNameList, String operator) {
public Result<Long> createJob(Long activeClusterPhyId,
Long standbyClusterPhyId,
List<String> topicNameList,
List<KafkaUserAndClientDTO> kafkaUserAndClientList,
String operator) {
try {
// 父任务
HaASSwitchJobDO jobDO = new HaASSwitchJobDO(activeClusterPhyId, standbyClusterPhyId, HaJobStatusEnum.RUNNING.getStatus(), operator);
HaASSwitchJobDO jobDO = new HaASSwitchJobDO(
activeClusterPhyId,
standbyClusterPhyId,
ValidateUtils.isEmptyList(kafkaUserAndClientList)? 0: 1,
kafkaUserAndClientList,
HaJobStatusEnum.RUNNING.getStatus(),
operator
);
haASSwitchJobDao.insert(jobDO);
// 子任务

View File

@@ -6,10 +6,15 @@ import kafka.admin.AdminUtils;
import kafka.admin.AdminUtils$;
import kafka.server.ConfigType;
import kafka.utils.ZkUtils;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Properties;
@@ -40,7 +45,7 @@ public class HaKafkaUserCommands {
props.putAll(modifiedProps);
// 修改配置, 这里不使用changeUserOrUserClientIdConfig方法的原因是changeUserOrUserClientIdConfig这个方法会进行参数检查
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), kafkaUser, props);
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), sanitize(kafkaUser), props);
} catch (Exception e) {
LOGGER.error("method=changeHaUserConfig||zookeeper={}||kafkaUser={}||modifiedProps={}||errMsg=exception", zookeeper, kafkaUser, modifiedProps, e);
return false;
@@ -73,7 +78,7 @@ public class HaKafkaUserCommands {
}
// 修改配置, 这里不使用changeUserOrUserClientIdConfig方法的原因是changeUserOrUserClientIdConfig这个方法会进行参数检查
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), kafkaUser, presentProps);
AdminUtils$.MODULE$.kafka$admin$AdminUtils$$changeEntityConfig(zkUtils, ConfigType.User(), sanitize(kafkaUser), presentProps);
return true;
}catch (Exception e){
@@ -90,4 +95,37 @@ public class HaKafkaUserCommands {
private HaKafkaUserCommands() {
}
private static String sanitize(String name) {
String encoded = "";
try {
encoded = URLEncoder.encode(name, StandardCharsets.UTF_8.name());
StringBuilder builder = new StringBuilder();
for (int i = 0; i < encoded.length(); i++) {
char c = encoded.charAt(i);
if (c == '*') { // Metric ObjectName treats * as pattern
builder.append("%2A");
} else if (c == '+') { // Space URL-encoded as +, replace with percent encoding
builder.append("%20");
} else {
builder.append(c);
}
}
return builder.toString();
} catch (UnsupportedEncodingException e) {
throw new KafkaException(e);
}
}
/**
* Desanitize name that was URL-encoded using {@link #sanitize(String)}. This
* is used to obtain the desanitized version of node names in ZooKeeper.
*/
private static String desanitize(String name) {
try {
return URLDecoder.decode(name, StandardCharsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new KafkaException(e);
}
}
}

View File

@@ -16,4 +16,6 @@ public interface TopicConnectionDao {
List<TopicConnectionDO> getByTopicName(Long clusterId, String topicName, Date startTime, Date endTime);
List<TopicConnectionDO> getByAppId(String appId, Date startTime, Date endTime);
List<TopicConnectionDO> getByClusterAndAppId(Long clusterId, String appId, Date startTime, Date endTime);
}

View File

@@ -58,4 +58,14 @@ public class TopicConnectionDaoImpl implements TopicConnectionDao {
params.put("endTime", endTime);
return sqlSession.selectList("TopicConnectionDao.getByAppId", params);
}
@Override
public List<TopicConnectionDO> getByClusterAndAppId(Long clusterId, String appId, Date startTime, Date endTime) {
Map<String, Object> params = new HashMap<>(4);
params.put("appId", appId);
params.put("clusterId", clusterId);
params.put("startTime", startTime);
params.put("endTime", endTime);
return sqlSession.selectList("TopicConnectionDao.getByClusterAndAppId", params);
}
}

View File

@@ -10,6 +10,8 @@
<result column="active_cluster_phy_id" property="activeClusterPhyId" />
<result column="standby_cluster_phy_id" property="standbyClusterPhyId" />
<result column="job_status" property="jobStatus" />
<result column="type" property="type" />
<result column="extend_data" property="extendData" />
<result column="operator" property="operator" />
</resultMap>
@@ -18,9 +20,9 @@
useGeneratedKeys="true"
keyProperty="id">
INSERT INTO ks_km_physical_cluster
(active_cluster_phy_id, standby_cluster_phy_id, job_status, operator)
(active_cluster_phy_id, standby_cluster_phy_id, job_status, `type`, extend_data, operator)
VALUES
(#{activeClusterPhyId}, #{standbyClusterPhyId}, #{jobStatus}, #{operator})
(#{activeClusterPhyId}, #{standbyClusterPhyId}, #{jobStatus}, #{type}, #{extendData}, #{operator})
</insert>
<select id="listAllLatest" resultMap="HaASSwitchJobMap">

View File

@@ -10,31 +10,27 @@
<result property="appId" column="app_id"/>
<result property="ip" column="ip"/>
<result property="clientVersion" column="client_version"/>
<result property="clientId" column="client_id"/>
<result property="realConnectTime" column="real_connect_time"/>
<result property="createTime" column="create_time"/>
</resultMap>
<insert id="batchReplace" parameterType="java.util.List">
REPLACE INTO topic_connections (
cluster_id,
topic_name,
`type`,
app_id,
ip,
client_version,
create_time
)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(
#{item.clusterId},
#{item.topicName},
#{item.type},
#{item.appId},
#{item.ip},
#{item.clientVersion},
#{item.createTime}
)
insert into topic_connections (cluster_id, topic_name, `type`, app_id, ip, client_version, client_id, real_connect_time, create_time)
values
<foreach collection="list" item="item" separator=",">
(#{item.clusterId}, #{item.topicName}, #{item.type}, #{item.appId}, #{item.ip}, #{item.clientVersion}, #{item.clientId}, #{item.realConnectTime}, #{item.createTime})
</foreach>
on duplicate key update
real_connect_time = IF(real_connect_time > VALUES(real_connect_time), real_connect_time, VALUES(real_connect_time)),
cluster_id = VALUES(cluster_id),
topic_name = VALUES(topic_name),
`type` = VALUES(`type`),
app_id = VALUES(app_id),
ip = VALUES(ip),
client_version = VALUES(client_version),
client_id = VALUES(client_id),
create_time = VALUES(create_time)
</insert>
<select id="getByTopicName" parameterType="java.util.Map" resultMap="TopicConnectionMap">
@@ -53,4 +49,14 @@
AND create_time >= #{startTime} AND #{endTime} >= create_time
]]>
</select>
<select id="getByClusterAndAppId" parameterType="java.util.Map" resultMap="TopicConnectionMap">
<![CDATA[
SELECT * FROM topic_connections
WHERE app_id = #{appId}
AND cluster_id = #{clusterId}
AND create_time >= #{startTime}
AND #{endTime} >= create_time
]]>
</select>
</mapper>

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.kafka.manager.task.dispatch.op;
package com.xiaojukeji.kafka.manager.task.dispatch.ha;
import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager;
import com.xiaojukeji.kafka.manager.service.service.ha.HaASSwitchJobService;

View File

@@ -0,0 +1,156 @@
//package com.xiaojukeji.kafka.manager.task.dispatch.ha;
//
//import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
//import com.xiaojukeji.kafka.manager.common.bizenum.ha.job.HaJobStatusEnum;
//import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
//import com.xiaojukeji.kafka.manager.common.constant.Constant;
//import com.xiaojukeji.kafka.manager.common.entity.dto.ha.ASSwitchJobDTO;
//import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
//import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
//import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASSwitchJobDO;
//import com.xiaojukeji.kafka.manager.common.utils.HAUtils;
//import com.xiaojukeji.kafka.manager.common.utils.Tuple;
//import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
//import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager;
//import com.xiaojukeji.kafka.manager.service.service.ClusterService;
//import com.xiaojukeji.kafka.manager.service.service.ConfigService;
//import com.xiaojukeji.kafka.manager.service.service.gateway.TopicConnectionService;
//import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
//import com.xiaojukeji.kafka.manager.service.service.ha.HaASSwitchJobService;
//import com.xiaojukeji.kafka.manager.task.component.AbstractScheduledTask;
//import com.xiaojukeji.kafka.manager.task.component.CustomScheduled;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//import java.util.*;
//import java.util.function.Function;
//import java.util.stream.Collectors;
//
///**
// * 主备切换任务
// */
//@Component
//@CustomScheduled(name = "HandleHaClientNewTopic",
// cron = "0 0/2 * * * ?",
// threadNum = 1,
// description = "处理HAClient的新增Topic")
//public class HandleHaClientNewTopic extends AbstractScheduledTask<ClusterDO> {
// @Autowired
// private ClusterService clusterService;
//
// @Autowired
// private HaASRelationService haASRelationService;
//
// @Autowired
// private TopicConnectionService topicConnectionService;
//
// @Autowired
// private HaASSwitchJobManager haASSwitchJobManager;
//
// @Autowired
// private HaASSwitchJobService haASSwitchJobService;
//
// @Autowired
// private ConfigService configService;
//
// @Override
// public List<ClusterDO> listAllTasks() {
// return clusterService.list();
// }
//
// @Override
// public void processTask(ClusterDO clusterDO) {
// if (this.existRunningTask(clusterDO.getId())) {
// // 存在运行中的任务
// return;
// }
//
// // 获取已经建立HA的Client
// List<HaASRelationDO> doList = haASRelationService.listAllHAFromDB(clusterDO.getId(), HaResTypeEnum.KAFKA_USER_AND_CLIENT);
//
// // 获取已经建立HA的Topic
// Map<String, HaASRelationDO> nameMap = haASRelationService.listAllHAFromDB(clusterDO.getId(), HaResTypeEnum.TOPIC)
// .stream()
// .collect(Collectors.toMap(HaASRelationDO::getActiveResName, Function.identity()));
//
// // 新的主备集群 & 需要切换的Topic
// Long newActiveClusterId = null;
// Long newStandbyClusterId = null;
// Map<String, HaASRelationDO> needSwitchTopicMap = new HashMap<>();
//
// // 查找clientId关联的Topic列表
// for (HaASRelationDO asRelationDO: doList) {
// if (newActiveClusterId != null && !newActiveClusterId.equals(asRelationDO.getActiveClusterPhyId())) {
// // 一次切换仅能有一个主集群ID不能有多个。不一致时直接忽略
// continue;
// }
//
// Tuple<String, String> userAndClient = HAUtils.splitKafkaUserAndClient(asRelationDO.getActiveResName());
// if (userAndClient == null || ValidateUtils.isBlank(userAndClient.getV2())) {
// continue;
// }
//
// // 获取该client对应的Topic
// Set<String> topicNameSet = topicConnectionService.getKafkaUserAndClientIdTopicNames(
// new HashSet<>(Arrays.asList(asRelationDO.getActiveClusterPhyId(), asRelationDO.getStandbyClusterPhyId())),
// userAndClient.getV1(),
// userAndClient.getV2(),
// new Date(System.currentTimeMillis() - configService.getLongValue(ConfigConstant.HA_CONNECTION_ACTIVE_TIME_UNIT_MIN, 20L) * 60L * 1000L),
// new Date()
// );
//
// // 遍历Topic判断主备关系是否符合预期
// for (String topicName: topicNameSet) {
// HaASRelationDO topicRelation = nameMap.get(topicName);
// if (topicRelation == null
// || asRelationDO.getActiveClusterPhyId().equals(topicRelation.getActiveClusterPhyId())) {
// // Topic为空未建立高可用忽略该Topic
// // 已建立HA且该Topic的主备信息和当前clientId一致因此也不需要进行主备切换
// continue;
// }
//
// // 主备信息不一致时,进行主备切换
// if (needSwitchTopicMap.isEmpty()) {
// newActiveClusterId = asRelationDO.getActiveClusterPhyId();
// newStandbyClusterId = asRelationDO.getStandbyClusterPhyId();
// }
//
// needSwitchTopicMap.put(topicName, topicRelation);
// }
// }
//
// if (this.existRunningTask(clusterDO.getId())) {
// // 再次判断是否存在运行中的任务
// return;
// }
//
// // 创建任务
// haASSwitchJobManager.createJob(
// this.convert2ASSwitchJobDTO(newActiveClusterId, newStandbyClusterId, new ArrayList<>(needSwitchTopicMap.values())),
// Constant.DEFAULT_USER_NAME
// );
// }
//
// private ASSwitchJobDTO convert2ASSwitchJobDTO(Long newActiveClusterId, Long newStandbyClusterId, List<HaASRelationDO> doList) {
// ASSwitchJobDTO dto = new ASSwitchJobDTO();
// dto.setAll(false);
// dto.setMustContainAllKafkaUserTopics(false);
// dto.setActiveClusterPhyId(newActiveClusterId);
// dto.setStandbyClusterPhyId(newStandbyClusterId);
// dto.setTopicNameList(doList.stream().map(elem -> elem.getActiveResName()).collect(Collectors.toList()));
// dto.setKafkaUserAndClientIdList(new ArrayList<>()); // clientId 或者 kafkaUser 已切换好,所以后台任务不需要执行该步骤
//
// return dto;
// }
//
// private boolean existRunningTask(Long clusterPhyId) {
// Map<Long/*集群ID*/, HaASSwitchJobDO> jobMap = haASSwitchJobService.listClusterLatestJobs();
//
// HaASSwitchJobDO jobDO = jobMap.remove(clusterPhyId);
// if (jobDO == null || !HaJobStatusEnum.isRunning(jobDO.getJobStatus())) {
// return false;
// }
//
// return true;
// }
//}

View File

@@ -16,6 +16,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.HashSet;
import java.util.List;
/**
@@ -52,6 +53,10 @@ public class RdAppController {
@PostMapping(value = "apps/relate-topics")
@ResponseBody
public Result<List<AppRelateTopicsVO>> appRelateTopics(@Validated @RequestBody AppRelateTopicsDTO dto) {
return haAppManager.appRelateTopics(dto.getClusterPhyId(), dto.getFilterTopicNameList());
if (dto.getUseKafkaUserAndClientId() != null && dto.getUseKafkaUserAndClientId()) {
return haAppManager.appAndClientRelateTopics(dto.getClusterPhyId(), new HashSet<>(dto.getFilterTopicNameList()));
}
return haAppManager.appRelateTopics(dto.getHa(), dto.getClusterPhyId(), dto.getFilterTopicNameList());
}
}

View File

@@ -16,7 +16,7 @@
</parent>
<properties>
<kafka-manager.revision>2.8.0_e</kafka-manager.revision>
<kafka-manager.revision>2.8.1_e</kafka-manager.revision>
<spring.boot.version>2.1.18.RELEASE</spring.boot.version>
<swagger2.version>2.9.2</swagger2.version>
<swagger.version>1.5.21</swagger.version>