diff --git a/kafka-manager-console/src/container/modal/admin/SwitchTaskLog.tsx b/kafka-manager-console/src/container/modal/admin/SwitchTaskLog.tsx
new file mode 100644
index 00000000..297f2d08
--- /dev/null
+++ b/kafka-manager-console/src/container/modal/admin/SwitchTaskLog.tsx
@@ -0,0 +1,300 @@
+import * as React from 'react';
+import { Modal, Progress, Tooltip } from 'antd';
+import { IMetaData } from 'types/base-type';
+import { Alert, Badge, Button, Input, message, notification, Table } from 'component/antd';
+import { getJobDetail, getJobState, getJobLog, switchAsJobs } from 'lib/api';
+import moment from 'moment';
+import { timeFormat } from 'constants/strategy';
+
+interface IProps {
+ reload: any;
+ visible?: boolean;
+ handleVisible?: any;
+ currentCluster?: IMetaData;
+}
+
+interface IJobState {
+ failedNu: number;
+ jobNu: number;
+ runningNu: number;
+ successNu: number;
+ waitingNu: number;
+ runningInTimeoutNu: number;
+ progress: number;
+}
+
+interface IJobDetail {
+ standbyClusterPhyId: number;
+ status: number;
+ sumLag: number;
+ timeoutUnitSecConfig: number;
+ topicName: string;
+ activeClusterPhyName: string;
+ standbyClusterPhyName: string;
+}
+
+interface ILog {
+ bizKeyword: string;
+ bizType: number;
+ content: string;
+ id: number;
+ printTime: number;
+}
+interface IJobLog {
+ logList: ILog[];
+ endLogId: number;
+}
+const STATUS_MAP = {
+ '-1': '未知',
+ '30': '运行中',
+ '32': '超时运行中',
+ '101': '成功',
+ '102': '失败',
+} as any;
+const STATUS_COLORS = {
+ '-1': '#575757',
+ '30': '#575757',
+ '32': '#F5202E',
+ '101': '#2FC25B',
+ '102': '#F5202E',
+} as any;
+const STATUS_COLOR_MAP = {
+ '-1': 'black',
+ '30': 'black',
+ '32': 'red',
+ '101': 'green',
+ '102': 'red',
+} as any;
+
+const getFilters = () => {
+ const keys = Object.keys(STATUS_MAP);
+ const filters = [];
+ for (const key of keys) {
+ filters.push({
+ text: STATUS_MAP[key],
+ value: key,
+ });
+ }
+ return filters;
+};
+
+const columns = [
+ {
+ dataIndex: 'key',
+ title: '编号',
+ width: 60,
+ },
+ {
+ dataIndex: 'topicName',
+ title: 'Topic名称',
+ width: 120,
+ ellipsis: true,
+ },
+ {
+ dataIndex: 'sumLag',
+ title: '延迟',
+ width: 100,
+ render: (value: number) => value ?? '-',
+ },
+ {
+ dataIndex: 'status',
+ title: '状态',
+ width: 100,
+ filters: getFilters(),
+ onFilter: (value: string, record: IJobDetail) => record.status === Number(value),
+ render: (t: number) => (
+
+
+
+ ),
+ },
+];
+
+export class TopicSwitchLog extends React.Component
{
+ public state = {
+ radioCheck: 'all',
+ jobDetail: [] as IJobDetail[],
+ jobState: {} as IJobState,
+ jobLog: {} as IJobLog,
+ textStr: '',
+ primaryTargetKeys: [] as string[],
+ loading: false,
+ };
+ public timer = null as number;
+ public jobId = this.props.currentCluster?.haClusterVO?.haASSwitchJobId as number;
+
+ public handleOk = () => {
+ this.props.handleVisible(false);
+ this.props.reload();
+ }
+
+ public handleCancel = () => {
+ this.props.handleVisible(false);
+ this.props.reload();
+ }
+
+ public iTimer = () => {
+ this.timer = window.setInterval(() => {
+ const { jobLog } = this.state;
+ this.getContentJobLog(jobLog.endLogId);
+ this.getContentJobState();
+ this.getContentJobDetail();
+ }, 10 * 1 * 1000);
+ }
+
+ public getTextAreaStr = (logList: ILog[]) => {
+ const strs = [];
+
+ for (const item of logList) {
+ strs.push(`${moment(item.printTime).format(timeFormat)} ${item.content}`);
+ }
+
+ return strs.join(`\n`);
+ }
+
+ public getContentJobLog = (startId?: number) => {
+ getJobLog(this.jobId, startId).then((res: IJobLog) => {
+ const { jobLog } = this.state;
+ const logList = (jobLog.logList || []);
+ logList.push(...(res?.logList || []));
+
+ const newJobLog = {
+ endLogId: res?.endLogId,
+ logList,
+ };
+
+ this.setState({
+ textStr: this.getTextAreaStr(logList),
+ jobLog: newJobLog,
+ });
+ });
+ }
+
+ public getContentJobState = () => {
+ getJobState(this.jobId).then((res: IJobState) => {
+ // 成功后清除调用
+ if (res?.jobNu === res.successNu) {
+ clearInterval(this.timer);
+ }
+ this.setState({
+ jobState: res || {},
+ });
+ });
+ }
+ public getContentJobDetail = () => {
+ getJobDetail(this.jobId).then((res: IJobDetail[]) => {
+ this.setState({
+ jobDetail: (res || []).map((row, index) => ({
+ ...row,
+ key: index,
+ })),
+ });
+ });
+ }
+
+ public switchJobs = () => {
+ const { jobState } = this.state;
+ Modal.confirm({
+ title: '强制切换',
+ content: `当前有${jobState.runningNu}个Topic切换中,${jobState.runningInTimeoutNu}个Topic切换超时,强制切换会使这些Topic有数据丢失的风险,确定强制切换吗?`,
+ onOk: () => {
+ this.setState({
+ loading: true,
+ });
+ switchAsJobs(this.jobId, {
+ action: 'force',
+ allJumpWaitInSync: true,
+ jumpWaitInSyncActiveTopicList: [],
+ }).then(res => {
+ message.success('强制切换成功');
+ }).finally(() => {
+ this.setState({
+ loading: false,
+ });
+ });
+ },
+ });
+ }
+
+ public componentWillUnmount() {
+ clearInterval(this.timer);
+ }
+
+ public componentDidMount() {
+ this.getContentJobDetail();
+ this.getContentJobState();
+ this.getContentJobLog();
+ setTimeout(this.iTimer, 0);
+ }
+
+ public render() {
+ const { visible, currentCluster } = this.props;
+ const { jobState, jobDetail, textStr, loading } = this.state;
+ const runtimeJob = jobDetail.filter(item => item.status === 32);
+ const percent = jobState?.progress;
+ return (
+
+ {runtimeJob.length ?
+
+ : null}
+
+
+
+
+
+
+
+
+
+
+ 源集群 {jobDetail?.[0]?.standbyClusterPhyName || ''}
+ 目标集群 {jobDetail?.[0]?.activeClusterPhyName || ''}
+
+
+
+
+ Topic总数 {jobState.jobNu ?? '-'} 个,
+ 切换成功 {jobState.successNu ?? '-'} 个,
+ 切换超时 {jobState.failedNu ?? '-'} 个,
+ 待切换 {jobState.waitingNu ?? '-'} 个。
+
+
+
+
+
+
+
+
+
+ );
+ }
+}
diff --git a/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx b/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx
new file mode 100644
index 00000000..8f3b128d
--- /dev/null
+++ b/kafka-manager-console/src/container/modal/admin/TopicHaRelation.tsx
@@ -0,0 +1,351 @@
+import * as React from 'react';
+import { admin } from 'store/admin';
+import { Modal, Form, Radio } from 'antd';
+import { IBrokersMetadata, IBrokersRegions, IMetaData } from 'types/base-type';
+import { Alert, message, notification, Table, Tooltip, Transfer } from 'component/antd';
+import { getClusterHaTopicsStatus, setHaTopics, unbindHaTopics } from 'lib/api';
+import { cellStyle } from 'constants/table';
+
+const layout = {
+ labelCol: { span: 3 },
+ wrapperCol: { span: 21 },
+};
+
+interface IXFormProps {
+ form: any;
+ reload: any;
+ formData?: any;
+ visible?: boolean;
+ handleVisible?: any;
+ currentCluster?: IMetaData;
+}
+
+interface IHaTopic {
+ clusterId: number;
+ clusterName: string;
+ haRelation: number;
+ topicName: string;
+ key: string;
+ disabled?: boolean;
+}
+
+const resColumns = [
+ {
+ title: 'TopicName',
+ dataIndex: 'topicName',
+ key: 'topicName',
+ width: 120,
+ },
+ {
+ title: '状态',
+ dataIndex: 'code',
+ key: 'code',
+ width: 60,
+ render: (t: number) => {
+ return (
+
+ {t === 0 ? '成功' : '失败'}
+
+ );
+ },
+ },
+ {
+ title: '原因',
+ dataIndex: 'message',
+ key: 'message',
+ width: 125,
+ onCell: () => ({
+ style: {
+ maxWidth: 120,
+ ...cellStyle,
+ },
+ }),
+ render: (text: string) => {
+ return (
+
+ {text}
+ );
+ },
+ },
+];
+class TopicHaRelation extends React.Component {
+ public state = {
+ radioCheck: 'spec',
+ haTopics: [] as IHaTopic[],
+ targetKeys: [] as string[],
+ confirmLoading: false,
+ firstMove: true,
+ primaryActiveKeys: [] as string[],
+ primaryStandbyKeys: [] as string[],
+ };
+
+ public handleOk = () => {
+ this.props.form.validateFields((err: any, values: any) => {
+ const unbindTopics = [];
+ const bindTopics = [];
+
+ if (values.rule === 'all') {
+ setHaTopics({
+ all: true,
+ activeClusterId: this.props.currentCluster.clusterId,
+ standbyClusterId: this.props.currentCluster.haClusterVO.clusterId,
+ topicNames: [],
+ }).then(res => {
+ handleMsg(res, '关联成功');
+ this.setState({
+ confirmLoading: false,
+ });
+ this.handleCancel();
+ });
+ return;
+ }
+
+ for (const item of this.state.primaryStandbyKeys) {
+ if (!this.state.targetKeys.includes(item)) {
+ unbindTopics.push(item);
+ }
+ }
+ for (const item of this.state.targetKeys) {
+ if (!this.state.primaryStandbyKeys.includes(item)) {
+ bindTopics.push(item);
+ }
+ }
+
+ if (!unbindTopics.length && !bindTopics.length) {
+ return message.info('请选择您要操作的Topic');
+ }
+
+ const handleMsg = (res: any[], successTip: string) => {
+ const errorRes = res.filter(item => item.code !== 0);
+
+ if (errorRes.length) {
+ Modal.confirm({
+ title: '执行结果',
+ width: 520,
+ icon: null,
+ content: (
+
+ ),
+ });
+ } else {
+ notification.success({ message: successTip });
+ }
+
+ this.props.reload();
+ };
+
+ if (bindTopics.length) {
+ this.setState({
+ confirmLoading: true,
+ });
+ setHaTopics({
+ all: false,
+ activeClusterId: this.props.currentCluster.clusterId,
+ standbyClusterId: this.props.currentCluster.haClusterVO.clusterId,
+ topicNames: bindTopics,
+ }).then(res => {
+ this.setState({
+ confirmLoading: false,
+ });
+ this.handleCancel();
+ handleMsg(res, '关联成功');
+ });
+ }
+
+ if (unbindTopics.length) {
+ this.setState({
+ confirmLoading: true,
+ });
+ unbindHaTopics({
+ all: false,
+ activeClusterId: this.props.currentCluster.clusterId,
+ standbyClusterId: this.props.currentCluster.haClusterVO.clusterId,
+ topicNames: unbindTopics,
+ }).then(res => {
+ this.setState({
+ confirmLoading: false,
+ });
+ this.handleCancel();
+ handleMsg(res, '解绑成功');
+ });
+ }
+ });
+ }
+
+ public handleCancel = () => {
+ this.props.handleVisible(false);
+ this.props.form.resetFields();
+ }
+
+ public handleRadioChange = (e: any) => {
+ this.setState({
+ radioCheck: e.target.value,
+ });
+ }
+
+ public isPrimaryStatus = (targetKeys: string[]) => {
+ const { primaryStandbyKeys } = this.state;
+ let isReset = false;
+ // 判断当前移动是否还原为最初的状态
+ if (primaryStandbyKeys.length === targetKeys.length) {
+ targetKeys.sort((a, b) => +a - (+b));
+ primaryStandbyKeys.sort((a, b) => +a - (+b));
+ let i = 0;
+ while (i < targetKeys.length) {
+ if (targetKeys[i] === primaryStandbyKeys[i]) {
+ i++;
+ } else {
+ break;
+ }
+ }
+ isReset = i === targetKeys.length;
+ }
+ return isReset;
+ }
+
+ public setTopicsStatus = (targetKeys: string[], disabled: boolean, isAll = false) => {
+ const { haTopics } = this.state;
+ const newTopics = Array.from(haTopics);
+ if (isAll) {
+ for (let i = 0; i < haTopics.length; i++) {
+ newTopics[i].disabled = disabled;
+ }
+ } else {
+ for (const key of targetKeys) {
+ const index = haTopics.findIndex(item => item.key === key);
+ if (index > -1) {
+ newTopics[index].disabled = disabled;
+ }
+ }
+ }
+ this.setState(({
+ haTopics: newTopics,
+ }));
+ }
+
+ public onTransferChange = (targetKeys: string[], direction: string, moveKeys: string[]) => {
+ const { primaryStandbyKeys, firstMove, primaryActiveKeys } = this.state;
+ // 判断当前移动是否还原为最初的状态
+ const isReset = this.isPrimaryStatus(targetKeys);
+ if (firstMove) {
+ const primaryKeys = direction === 'right' ? primaryStandbyKeys : primaryActiveKeys;
+ this.setTopicsStatus(primaryKeys, true, false);
+ this.setState(({
+ firstMove: false,
+ targetKeys,
+ }));
+ return;
+ }
+
+ // 如果是还原为初始状态则还原禁用状态
+ if (isReset) {
+ this.setTopicsStatus([], false, true);
+ this.setState(({
+ firstMove: true,
+ targetKeys,
+ }));
+ return;
+ }
+
+ this.setState({
+ targetKeys,
+ });
+ }
+
+ public componentDidMount() {
+ Promise.all([
+ getClusterHaTopicsStatus(this.props.currentCluster.clusterId, true),
+ getClusterHaTopicsStatus(this.props.currentCluster.clusterId, false),
+ ]).then(([activeRes, standbyRes]: IHaTopic[][]) => {
+ activeRes = (activeRes || []).map(row => ({
+ ...row,
+ key: row.topicName,
+ })).filter(item => item.haRelation === null);
+ standbyRes = (standbyRes || []).map(row => ({
+ ...row,
+ key: row.topicName,
+ })).filter(item => item.haRelation === 1 || item.haRelation === 0);
+ this.setState({
+ haTopics: [].concat([...activeRes, ...standbyRes]).sort((a, b) => a.topicName.localeCompare(b.topicName)),
+ primaryActiveKeys: activeRes.map(row => row.topicName),
+ primaryStandbyKeys: standbyRes.map(row => row.topicName),
+ targetKeys: standbyRes.map(row => row.topicName),
+ });
+ });
+ }
+
+ public render() {
+ const { formData = {} as any, visible, currentCluster } = this.props;
+ const { getFieldDecorator } = this.props.form;
+ let metadata = [] as IBrokersMetadata[];
+ metadata = admin.brokersMetadata ? admin.brokersMetadata : metadata;
+ let regions = [] as IBrokersRegions[];
+ regions = admin.brokersRegions ? admin.brokersRegions : regions;
+ return (
+ <>
+
+
+
+ {getFieldDecorator('rule', {
+ initialValue: 'spec',
+ rules: [{
+ required: true,
+ message: '请选择规则',
+ }],
+ })(
+ 应用于所有Topic
+ 应用于特定Topic
+ )}
+ */}
+ {this.state.radioCheck === 'spec' ?
+ {getFieldDecorator('topicNames', {
+ initialValue: this.state.targetKeys,
+ rules: [{
+ required: false,
+ message: '请选择Topic',
+ }],
+ })(
+ item.topicName}
+ titles={['未关联', '已关联']}
+ locale={{
+ itemUnit: '',
+ itemsUnit: '',
+ }}
+ />,
+ )}
+ : ''}
+
+
+ >
+ );
+ }
+}
+export const TopicHaRelationWrapper = Form.create()(TopicHaRelation);
diff --git a/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx b/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx
new file mode 100644
index 00000000..78c5565b
--- /dev/null
+++ b/kafka-manager-console/src/container/modal/admin/TopicHaSwitch.tsx
@@ -0,0 +1,718 @@
+import * as React from 'react';
+import { admin } from 'store/admin';
+import { Modal, Form, Radio, Tag, Popover, Button } from 'antd';
+import { IBrokersMetadata, IBrokersRegions, IMetaData } from 'types/base-type';
+import { Alert, Icon, message, Table, Transfer } from 'component/antd';
+import { getClusterHaTopics, getAppRelatedTopics, createSwitchTask } from 'lib/api';
+import { TooltipPlacement } from 'antd/es/tooltip';
+import * as XLSX from 'xlsx';
+import moment from 'moment';
+import { timeMinute } from 'constants/strategy';
+
+const layout = {
+ labelCol: { span: 3 },
+ wrapperCol: { span: 21 },
+};
+
+interface IXFormProps {
+ form: any;
+ reload: any;
+ formData?: any;
+ visible?: boolean;
+ handleVisible?: any;
+ currentCluster?: IMetaData;
+}
+
+interface IHaTopic {
+ clusterId: number;
+ topicName: string;
+ key: string;
+ activeClusterId: number;
+ consumeAclNum: number;
+ produceAclNum: number;
+ standbyClusterId: number;
+ status: number;
+ disabled?: boolean;
+}
+
+interface IKafkaUser {
+ clusterPhyId: number;
+ kafkaUser: string;
+ notHaTopicNameList: string[];
+ notSelectTopicNameList: string[];
+ selectedTopicNameList: string[];
+ show: boolean;
+}
+
+const columns = [
+ {
+ dataIndex: 'topicName',
+ title: '名称',
+ width: 100,
+ ellipsis: true,
+ },
+ {
+ dataIndex: 'produceAclNum',
+ title: '生产者数量',
+ width: 80,
+ },
+ {
+ dataIndex: 'consumeAclNum',
+ title: '消费者数量',
+ width: 80,
+ },
+];
+
+const kafkaUserColumn = [
+ {
+ dataIndex: 'kafkaUser',
+ title: 'kafkaUser',
+ width: 100,
+ ellipsis: true,
+ },
+ {
+ dataIndex: 'selectedTopicNameList',
+ title: '已选中Topic',
+ width: 120,
+ render: (text: string[]) => {
+ return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
+ },
+ },
+ {
+ dataIndex: 'notSelectTopicNameList',
+ title: '选中关联Topic',
+ width: 120,
+ render: (text: string[]) => {
+ return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
+ },
+ },
+ {
+ dataIndex: 'notHaTopicNameList',
+ title: '未建立HA Topic',
+ width: 120,
+ render: (text: string[]) => {
+ return text?.length ? renderAttributes({ data: text, limit: 3 }) : '-';
+ },
+ },
+];
+
+export const renderAttributes = (params: {
+ data: any;
+ type?: string;
+ limit?: number;
+ splitType?: string;
+ placement?: TooltipPlacement;
+}) => {
+ const { data, type = ',', limit = 2, splitType = ';', placement } = params;
+ let attrArray = data;
+ if (!Array.isArray(data) && data) {
+ attrArray = data.split(type);
+ }
+ const showItems = attrArray.slice(0, limit) || [];
+ const hideItems = attrArray.slice(limit, attrArray.length) || [];
+ const content = hideItems.map((item: string, index: number) => (
+
+ {item}
+
+ ));
+ const showItemsContent = showItems.map((item: string, index: number) => (
+
+ {item}
+
+ ));
+
+ return (
+
+ {showItems.length > 0 ? showItemsContent : '-'}
+ {hideItems.length > 0 && (
+
+ 共{attrArray.length}个
+
+ )}
+
+ );
+};
+class TopicHaSwitch extends React.Component {
+ public state = {
+ radioCheck: 'spec',
+ targetKeys: [] as string[],
+ selectedKeys: [] as string[],
+ topics: [] as IHaTopic[],
+ kafkaUsers: [] as IKafkaUser[],
+ primaryActiveKeys: [] as string[],
+ primaryStandbyKeys: [] as string[],
+ firstMove: true,
+ };
+
+ public isPrimaryStatus = (targetKeys: string[]) => {
+ const { primaryStandbyKeys } = this.state;
+ let isReset = false;
+ // 判断当前移动是否还原为最初的状态
+ if (primaryStandbyKeys.length === targetKeys.length) {
+ targetKeys.sort((a, b) => +a - (+b));
+ primaryStandbyKeys.sort((a, b) => +a - (+b));
+ let i = 0;
+ while (i < targetKeys.length) {
+ if (targetKeys[i] === primaryStandbyKeys[i]) {
+ i++;
+ } else {
+ break;
+ }
+ }
+ isReset = i === targetKeys.length;
+ }
+ return isReset;
+ }
+
+ public getTargetTopics = (currentKeys: string[], primaryKeys: string[]) => {
+ const targetTopics = [];
+ for (const key of currentKeys) {
+ if (!primaryKeys.includes(key)) {
+ const topic = this.state.topics.find(item => item.key === key)?.topicName;
+ targetTopics.push(topic);
+ }
+ }
+ return targetTopics;
+ }
+
+ public handleOk = () => {
+ const { primaryStandbyKeys, primaryActiveKeys, topics } = this.state;
+ const standbyClusterId = this.props.currentCluster.haClusterVO.clusterId;
+ const activeClusterId = this.props.currentCluster.clusterId;
+
+ this.props.form.validateFields((err: any, values: any) => {
+
+ if (values.rule === 'all') {
+ createSwitchTask({
+ activeClusterPhyId: activeClusterId,
+ all: true,
+ mustContainAllKafkaUserTopics: true,
+ standbyClusterPhyId: standbyClusterId,
+ topicNameList: [],
+ }).then(res => {
+ message.success('任务创建成功');
+ this.handleCancel();
+ this.props.reload(res);
+ });
+ return;
+ }
+ // 判断当前移动是否还原为最初的状态
+ const isPrimary = this.isPrimaryStatus(values.targetKeys || []);
+ if (isPrimary) {
+ return message.info('请选择您要切换的Topic');
+ }
+
+ // 右侧框值
+ const currentStandbyKeys = values.targetKeys || [];
+ // 左侧框值
+ const currentActiveKeys = [];
+ for (const item of topics) {
+ if (!currentStandbyKeys.includes(item.key)) {
+ currentActiveKeys.push(item.key);
+ }
+ }
+
+ const currentKeys = currentStandbyKeys.length > primaryStandbyKeys.length ? currentStandbyKeys : currentActiveKeys;
+ const primaryKeys = currentStandbyKeys.length > primaryStandbyKeys.length ? primaryStandbyKeys : primaryActiveKeys;
+ const activeClusterPhyId = currentStandbyKeys.length > primaryStandbyKeys.length ? standbyClusterId : activeClusterId;
+ const standbyClusterPhyId = currentStandbyKeys.length > primaryStandbyKeys.length ? activeClusterId : standbyClusterId;
+ const targetTopics = this.getTargetTopics(currentKeys, primaryKeys);
+ createSwitchTask({
+ activeClusterPhyId,
+ all: false,
+ mustContainAllKafkaUserTopics: true,
+ standbyClusterPhyId,
+ topicNameList: targetTopics,
+ }).then(res => {
+ message.success('任务创建成功');
+ this.handleCancel();
+ this.props.reload(res);
+ });
+ });
+ }
+
+ public handleCancel = () => {
+ this.props.handleVisible(false);
+ this.props.form.resetFields();
+ }
+
+ public handleRadioChange = (e: any) => {
+ this.setState({
+ radioCheck: e.target.value,
+ });
+ }
+
+ public getNewSelectKeys = (removeKeys: string[], selectedKeys: string[]) => {
+ const { topics, kafkaUsers } = this.state;
+ // 根据移除的key找与该key关联的其他key,一起移除
+ let relatedTopics: string[] = [];
+ const relatedKeys: string[] = [];
+ const newSelectKeys = [];
+ for (const key of removeKeys) {
+ const topicName = topics.find(row => row.key === key)?.topicName;
+ for (const item of kafkaUsers) {
+ if (item.selectedTopicNameList.includes(topicName)) {
+ relatedTopics = relatedTopics.concat(item.selectedTopicNameList);
+ relatedTopics = relatedTopics.concat(item.notSelectTopicNameList);
+ }
+ }
+ for (const item of relatedTopics) {
+ const key = topics.find(row => row.topicName === item)?.key;
+ if (key) {
+ relatedKeys.push(key);
+ }
+ }
+ for (const key of selectedKeys) {
+ if (!relatedKeys.includes(key)) {
+ newSelectKeys.push(key);
+ }
+ }
+ }
+ return newSelectKeys;
+ }
+
+ public setTopicsStatus = (targetKeys: string[], disabled: boolean, isAll = false) => {
+ const { topics } = this.state;
+ const newTopics = Array.from(topics);
+ if (isAll) {
+ for (let i = 0; i < topics.length; i++) {
+ newTopics[i].disabled = disabled;
+ }
+ } else {
+ for (const key of targetKeys) {
+ const index = topics.findIndex(item => item.key === key);
+ if (index > -1) {
+ newTopics[index].disabled = disabled;
+ }
+ }
+ }
+ this.setState(({
+ topics: newTopics,
+ }));
+ }
+
+ public getFilterTopics = (selectKeys: string[]) => {
+ // 依据key值找topicName
+ const filterTopics: string[] = [];
+ const targetKeys = selectKeys;
+ for (const key of targetKeys) {
+ const topicName = this.state.topics.find(item => item.key === key)?.topicName;
+ if (topicName) {
+ filterTopics.push(topicName);
+ }
+ }
+ return filterTopics;
+ }
+
+ public getNewKafkaUser = (targetKeys: string[]) => {
+ const { primaryStandbyKeys, topics } = this.state;
+ const removeKeys = [];
+ const addKeys = [];
+ for (const key of primaryStandbyKeys) {
+ if (targetKeys.indexOf(key) < 0) {
+ // 移除的
+ removeKeys.push(key);
+ }
+ }
+ for (const key of targetKeys) {
+ if (primaryStandbyKeys.indexOf(key) < 0) {
+ // 新增的
+ addKeys.push(key);
+ }
+ }
+ const keepKeys = [...removeKeys, ...addKeys];
+ const newKafkaUsers = this.state.kafkaUsers;
+
+ const moveTopics = this.getFilterTopics(keepKeys);
+
+ for (const topic of moveTopics) {
+ for (const item of newKafkaUsers) {
+ if (item.selectedTopicNameList.includes(topic)) {
+ item.show = true;
+ }
+ }
+ }
+
+ const showKafaUsers = newKafkaUsers.filter(item => item.show === true);
+
+ for (const item of showKafaUsers) {
+ let i = 0;
+ while (i < moveTopics.length) {
+ if (!item.selectedTopicNameList.includes(moveTopics[i])) {
+ i++;
+ } else {
+ break;
+ }
+ }
+
+ // 表示该kafkaUser不该展示
+ if (i === moveTopics.length) {
+ item.show = false;
+ }
+ }
+
+ return showKafaUsers;
+ }
+
+ public getAppRelatedTopicList = (selectedKeys: string[]) => {
+ const { topics, targetKeys, primaryStandbyKeys, kafkaUsers } = this.state;
+ const filterTopicNameList = this.getFilterTopics(selectedKeys);
+ const isReset = this.isPrimaryStatus(targetKeys);
+
+ if (!filterTopicNameList.length && isReset) {
+ // targetKeys
+ this.setState({
+ kafkaUsers: kafkaUsers.map(item => ({
+ ...item,
+ show: false,
+ })),
+ });
+ return;
+ } else {
+ // 保留选中项与移动的的项
+ this.setState({
+ kafkaUsers: this.getNewKafkaUser(targetKeys),
+ });
+ }
+
+ // 单向选择,所以取当前值的aactiveClusterId
+ const clusterPhyId = topics.find(item => item.topicName === filterTopicNameList[0]).activeClusterId;
+ getAppRelatedTopics({
+ clusterPhyId,
+ filterTopicNameList,
+ }).then((res: IKafkaUser[]) => {
+ let notSelectTopicNames: string[] = [];
+ const notSelectTopicKeys: string[] = [];
+ for (const item of (res || [])) {
+ notSelectTopicNames = notSelectTopicNames.concat(item.notSelectTopicNameList || []);
+ }
+
+ for (const item of notSelectTopicNames) {
+ const key = topics.find(row => row.topicName === item)?.key;
+
+ if (key) {
+ notSelectTopicKeys.push(key);
+ }
+ }
+
+ const newSelectedKeys = selectedKeys.concat(notSelectTopicKeys);
+ const newKafkaUsers = (res || []).map(item => ({
+ ...item,
+ show: true,
+ }));
+ const { kafkaUsers } = this.state;
+
+ for (const item of kafkaUsers) {
+ const resItem = res.find(row => row.kafkaUser === item.kafkaUser);
+ if (!resItem) {
+ newKafkaUsers.push(item);
+ }
+ }
+ this.setState({
+ kafkaUsers: newKafkaUsers,
+ selectedKeys: newSelectedKeys,
+ });
+
+ if (notSelectTopicKeys.length) {
+ this.getAppRelatedTopicList(newSelectedKeys);
+ }
+ });
+ }
+
+ public getRelatedKeys = (currentKeys: string[]) => {
+ // 未被选中的项
+ const removeKeys = [];
+ // 对比上一次记录的选中的值找出本次取消的项
+ const { selectedKeys } = this.state;
+ for (const preKey of selectedKeys) {
+ if (!currentKeys.includes(preKey)) {
+ removeKeys.push(preKey);
+ }
+ }
+
+ return removeKeys?.length ? this.getNewSelectKeys(removeKeys, currentKeys) : currentKeys;
+ }
+
+ public handleTopicChange = (sourceSelectedKeys: string[], targetSelectedKeys: string[]) => {
+ const { topics, targetKeys } = this.state;
+ // 条件限制只允许选中一边,单向操作
+ const keys = [...sourceSelectedKeys, ...targetSelectedKeys];
+
+ // 判断当前选中项属于哪一类
+ if (keys.length) {
+ const activeClusterId = topics.find(item => item.key === keys[0]).activeClusterId;
+ const needDisabledKeys = topics.filter(item => item.activeClusterId !== activeClusterId).map(row => row.key);
+ this.setTopicsStatus(needDisabledKeys, true);
+ }
+ const selectedKeys = this.state.selectedKeys.length ? this.getRelatedKeys(keys) : keys;
+
+ const isReset = this.isPrimaryStatus(targetKeys);
+ if (!selectedKeys.length && isReset) {
+ this.setTopicsStatus([], false, true);
+ }
+ this.setState({
+ selectedKeys,
+ });
+ this.getAppRelatedTopicList(selectedKeys);
+ }
+
+ public onDirectChange = (targetKeys: string[], direction: string, moveKeys: string[]) => {
+ const { primaryStandbyKeys, firstMove, primaryActiveKeys, topics } = this.state;
+
+ const getKafkaUser = () => {
+ const newKafkaUsers = this.state.kafkaUsers;
+ const moveTopics = this.getFilterTopics(moveKeys);
+ for (const topic of moveTopics) {
+ for (const item of newKafkaUsers) {
+ if (item.selectedTopicNameList.includes(topic)) {
+ item.show = true;
+ }
+ }
+ }
+ return newKafkaUsers;
+ };
+ // 判断当前移动是否还原为最初的状态
+ const isReset = this.isPrimaryStatus(targetKeys);
+ if (firstMove) {
+ const primaryKeys = direction === 'right' ? primaryStandbyKeys : primaryActiveKeys;
+ this.setTopicsStatus(primaryKeys, true, false);
+ this.setState(({
+ firstMove: false,
+ kafkaUsers: getKafkaUser(),
+ targetKeys,
+ }));
+ return;
+ }
+ // 如果是还原为初始状态则还原禁用状态
+ if (isReset) {
+ this.setTopicsStatus([], false, true);
+ this.setState(({
+ firstMove: true,
+ targetKeys,
+ kafkaUsers: [],
+ }));
+ return;
+ }
+
+ // 切换后重新判定展示项
+ this.setState(({
+ targetKeys,
+ kafkaUsers: this.getNewKafkaUser(targetKeys),
+ }));
+
+ }
+
+ public downloadData = () => {
+ const { kafkaUsers } = this.state;
+ const tableData = kafkaUsers.map(item => {
+ return {
+ // tslint:disable
+ 'kafkaUser': item.kafkaUser,
+ '已选中Topic': item.selectedTopicNameList?.join('、'),
+ '选中关联Topic': item.notSelectTopicNameList?.join('、'),
+ '未建立HA Topic': item.notHaTopicNameList?.join(`、`),
+ };
+ });
+ const data = [].concat(tableData);
+ const wb = XLSX.utils.book_new();
+ // json转sheet
+ const ws = XLSX.utils.json_to_sheet(data, {
+ header: ['kafkaUser', '已选中Topic', '选中关联Topic', '未建立HA Topic'],
+ });
+ // XLSX.utils.
+ XLSX.utils.book_append_sheet(wb, ws, 'kafkaUser');
+ // 输出
+ XLSX.writeFile(wb, 'kafkaUser-' + moment((new Date()).getTime()).format(timeMinute) + '.xlsx');
+ }
+
+ public judgeSubmitStatus = () => {
+ const { kafkaUsers } = this.state;
+
+ const newKafkaUsers = kafkaUsers.filter(item => item.show)
+ for (const item of newKafkaUsers) {
+ if (item.notHaTopicNameList.length) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public componentDidMount() {
+ const standbyClusterId = this.props.currentCluster.haClusterVO.clusterId;
+ const activeClusterId = this.props.currentCluster.clusterId;
+ getClusterHaTopics(this.props.currentCluster.clusterId, standbyClusterId).then((res: IHaTopic[]) => {
+ res = res.map((item, index) => ({
+ key: index.toString(),
+ ...item,
+ }));
+ const targetKeys = (res || []).filter((item) => item.activeClusterId === standbyClusterId).map(row => row.key);
+ const primaryActiveKeys = (res || []).filter((item) => item.activeClusterId === activeClusterId).map(row => row.key);
+ this.setState({
+ topics: res || [],
+ primaryStandbyKeys: targetKeys,
+ primaryActiveKeys,
+ targetKeys,
+ });
+ });
+ }
+
+ public render() {
+ const { visible, currentCluster } = this.props;
+ const { getFieldDecorator } = this.props.form;
+ let metadata = [] as IBrokersMetadata[];
+ metadata = admin.brokersMetadata ? admin.brokersMetadata : metadata;
+ let regions = [] as IBrokersRegions[];
+ regions = admin.brokersRegions ? admin.brokersRegions : regions;
+ const tableData = this.state.kafkaUsers.filter(row => row.show);
+
+ return (
+
+
+
+ >
+ }
+ >
+
+
+ {getFieldDecorator('rule', {
+ initialValue: 'spec',
+ rules: [{
+ required: true,
+ message: '请选择规则',
+ }],
+ })(
+ 应用于所有Topic
+ 应用于特定Topic
+ )}
+ */}
+ {this.state.radioCheck === 'spec' ?
+ {getFieldDecorator('targetKeys', {
+ initialValue: this.state.targetKeys,
+ rules: [{
+ required: false,
+ message: '请选择Topic',
+ }],
+ })(
+ ,
+ )}
+ : ''}
+
+ {this.state.radioCheck === 'spec' ?
+ <>
+
+ {this.state.kafkaUsers.length ? : null}
+ >
+ : null}
+
+ );
+ }
+}
+export const TopicSwitchWrapper = Form.create()(TopicHaSwitch);
+
+const TableTransfer = ({ leftColumns, ...restProps }: any) => (
+
+ {({
+ filteredItems,
+ direction,
+ onItemSelect,
+ selectedKeys: listSelectedKeys,
+ }) => {
+ const columns = leftColumns;
+
+ const rowSelection = {
+ columnWidth: 40,
+ getCheckboxProps: (item: any) => ({
+ disabled: item.disabled,
+ }),
+ onSelect({ key }: any, selected: any) {
+ onItemSelect(key, selected);
+ },
+ selectedRowKeys: listSelectedKeys,
+ };
+ return (
+ ({
+ onClick: () => {
+ if (disabled) return;
+ onItemSelect(key, !listSelectedKeys.includes(key));
+ },
+ })}
+ />
+ );
+ }}
+
+);
+
+interface IProps {
+ value?: any;
+ onChange?: any;
+ onDirectChange?: any;
+ currentCluster: any;
+ topicChange: any;
+ dataSource: any[];
+ selectedKeys: string[];
+}
+
+export class TransferTable extends React.Component {
+ public onChange = (nextTargetKeys: any, direction: string, moveKeys: string[]) => {
+ this.props.onDirectChange(nextTargetKeys, direction, moveKeys);
+ // tslint:disable-next-line:no-unused-expression
+ this.props.onChange && this.props.onChange(nextTargetKeys);
+ }
+
+ public render() {
+ const { currentCluster, dataSource, value, topicChange, selectedKeys } = this.props;
+ return (
+
+ );
+ }
+}
diff --git a/kafka-manager-console/src/container/modal/topic.tsx b/kafka-manager-console/src/container/modal/topic.tsx
index 4e026641..d7f797ec 100644
--- a/kafka-manager-console/src/container/modal/topic.tsx
+++ b/kafka-manager-console/src/container/modal/topic.tsx
@@ -16,6 +16,17 @@ import { modal } from 'store/modal';
import { TopicAppSelect } from '../topic/topic-app-select';
import Url from 'lib/url-parser';
import { expandRemarks, quotaRemarks } from 'constants/strategy';
+import { getAppListByClusterId } from 'lib/api';
+
+const updateApplyTopicFormModal = (clusterId: number) => {
+ const formMap = wrapper.xFormWrapper.formMap;
+ const formData = wrapper.xFormWrapper.formData;
+ getAppListByClusterId(clusterId).then(res => {
+ formMap[2].customFormItem = ;
+ // tslint:disable-next-line:no-unused-expression
+ wrapper.ref && wrapper.ref.updateFormMap$(formMap, formData);
+ });
+};
export const applyTopic = () => {
const xFormModal = {
@@ -28,6 +39,9 @@ export const applyTopic = () => {
rules: [{ required: true, message: '请选择' }],
attrs: {
placeholder: '请选择',
+ onChange(value: number) {
+ updateApplyTopicFormModal(value);
+ },
},
}, {
key: 'topicName',
@@ -49,7 +63,7 @@ export const applyTopic = () => {
type: 'custom',
defaultValue: '',
rules: [{ required: true, message: '请选择' }],
- customFormItem: ,
+ customFormItem: ,
}, {
key: 'peakBytesIn',
label: '峰值流量',
@@ -88,7 +102,7 @@ export const applyTopic = () => {
],
formData: {},
visible: true,
- title: ,
+ title: ,
okText: '确认',
// customRenderElement: 集群资源充足时,预计1分钟自动审批通过,
isWaitting: true,
@@ -106,7 +120,7 @@ export const applyTopic = () => {
};
return topic.applyTopic(quotaParams).then(data => {
window.location.href = `${urlPrefix}/user/order-detail/?orderId=${data.id}®ion=${region.currentRegion}`;
- })
+ });
},
onSubmitFaild: (err: any, ref: any, formData: any, formMap: any) => {
if (err.message === 'topic already existed') {
@@ -115,10 +129,10 @@ export const applyTopic = () => {
topicName: {
value: topic,
errors: [new Error('该topic名称已存在')],
- }
- })
+ },
+ });
}
- }
+ },
};
wrapper.open(xFormModal);
};
@@ -186,7 +200,7 @@ export const showApplyQuatoModal = (item: ITopic | IAppsIdInfo, record: IQuotaQu
// rules: [{ required: true, message: '' }],
// attrs: { disabled: true },
// invisible: !item.hasOwnProperty('clusterName'),
- // },
+ // },
{
key: 'topicName',
label: 'Topic名称',
@@ -300,7 +314,7 @@ export const showTopicApplyQuatoModal = (item: ITopic) => {
// attrs: { disabled: true },
// defaultValue: item.clusterName,
// // invisible: !item.hasOwnProperty('clusterName'),
- // },
+ // },
{
key: 'topicName',
label: 'Topic名称',
@@ -380,12 +394,19 @@ export const showTopicApplyQuatoModal = (item: ITopic) => {
consumeQuota: transMBToB(value.consumeQuota),
produceQuota: transMBToB(value.produceQuota),
});
+
+ if (item.isPhysicalClusterId) {
+ Object.assign(quota, {
+ isPhysicalClusterId: true,
+ });
+ }
const quotaParams = {
type: 2,
applicant: users.currentUser.username,
description: value.description,
extensions: JSON.stringify(quota),
};
+
topic.applyQuota(quotaParams).then((data) => {
notification.success({ message: '申请配额成功' });
window.location.href = `${urlPrefix}/user/order-detail/?orderId=${data.id}®ion=${region.currentRegion}`;
@@ -454,23 +475,24 @@ const judgeAccessStatus = (access: number) => {
export const showAllPermissionModal = (item: ITopic) => {
let appId: string = null;
+ app.getAppListByClusterId(item.clusterId).then(res => {
+ if (!app.clusterAppData || !app.clusterAppData.length) {
+ return notification.info({
+ message: (
+ <>
+
+ 您的账号暂无可用应用,请先
+ 申请应用
+
+ >),
+ });
+ }
+ const index = app.clusterAppData.findIndex(row => row.appId === item.appId);
- if (!app.data || !app.data.length) {
- return notification.info({
- message: (
- <>
-
- 您的账号暂无可用应用,请先
- 申请应用
-
- >),
+ appId = index > -1 ? item.appId : app.clusterAppData[0].appId;
+ topic.getAuthorities(appId, item.clusterId, item.topicName).then((data) => {
+ showAllPermission(appId, item, data.access);
});
- }
- const index = app.data.findIndex(row => row.appId === item.appId);
-
- appId = index > -1 ? item.appId : app.data[0].appId;
- topic.getAuthorities(appId, item.clusterId, item.topicName).then((data) => {
- showAllPermission(appId, item, data.access);
});
};
@@ -494,7 +516,7 @@ const showAllPermission = (appId: string, item: ITopic, access: number) => {
defaultValue: appId,
rules: [{ required: true, message: '请选择应用' }],
type: 'custom',
- customFormItem: ,
+ customFormItem: ,
},
{
key: 'access',
diff --git a/kafka-manager-console/src/container/search-filter.tsx b/kafka-manager-console/src/container/search-filter.tsx
index f6ed09fa..ca621c03 100644
--- a/kafka-manager-console/src/container/search-filter.tsx
+++ b/kafka-manager-console/src/container/search-filter.tsx
@@ -18,7 +18,7 @@ interface IFilterParams {
}
interface ISearchAndFilterState {
- [filter: string]: boolean | string | number | any[];
+ [filter: string]: boolean | string | number | any;
}
export class SearchAndFilterContainer extends React.Component {
diff --git a/kafka-manager-console/src/container/topic/topic-detail/index.tsx b/kafka-manager-console/src/container/topic/topic-detail/index.tsx
index 0220341b..4745acba 100644
--- a/kafka-manager-console/src/container/topic/topic-detail/index.tsx
+++ b/kafka-manager-console/src/container/topic/topic-detail/index.tsx
@@ -331,11 +331,13 @@ export class TopicDetail extends React.Component {
public render() {
const role = users.currentUser.role;
const baseInfo = topic.baseInfo as ITopicBaseInfo;
- const showEditBtn = (role == 1 || role == 2) || (topic.topicBusiness && topic.topicBusiness.principals.includes(users.currentUser.username));
+ const showEditBtn = (role == 1 || role == 2) ||
+ (topic.topicBusiness && topic.topicBusiness.principals.includes(users.currentUser.username));
const topicRecord = {
clusterId: this.clusterId,
topicName: this.topicName,
- clusterName: this.clusterName
+ clusterName: this.clusterName,
+ isPhysicalClusterId: !!this.isPhysicalTrue,
} as ITopic;
return (
@@ -349,9 +351,12 @@ export class TopicDetail extends React.Component {
title={this.topicName || ''}
extra={
<>
- {this.needAuth == "true" && }
-
-
+ {this.needAuth == 'true' &&
+ }
+ {baseInfo.haRelation === 0 ? null :
+ }
+ {baseInfo.haRelation === 0 ? null :
+ }
{/* {showEditBtn && } */}
diff --git a/kafka-manager-console/src/lib/api.ts b/kafka-manager-console/src/lib/api.ts
index 39bb63ff..d0200653 100644
--- a/kafka-manager-console/src/lib/api.ts
+++ b/kafka-manager-console/src/lib/api.ts
@@ -248,6 +248,10 @@ export const getAppTopicList = (appId: string, mine: boolean) => {
return fetch(`/normal/apps/${appId}/topics?mine=${mine}`);
};
+export const getAppListByClusterId = (clusterId: number) => {
+ return fetch(`/normal/apps/${clusterId}`);
+};
+
/**
* 专家服务
*/
@@ -418,8 +422,69 @@ export const getMetaData = (needDetail: boolean = true) => {
return fetch(`/rd/clusters/basic-info?need-detail=${needDetail}`);
};
+export const getHaMetaData = () => {
+ return fetch(`/rd/clusters/ha/basic-info`);
+};
+
+export const getClusterHaTopics = (firstClusterId: number, secondClusterId?: number) => {
+ return fetch(`/rd/clusters/${firstClusterId}/ha-topics?secondClusterId=${secondClusterId || ''}`);
+};
+
+export const getClusterHaTopicsStatus = (firstClusterId: number, checkMetadata: boolean) => {
+ return fetch(`/rd/clusters/${firstClusterId}/ha-topics/status?checkMetadata=${checkMetadata}`);
+};
+
+export const setHaTopics = (params: any) => {
+ return fetch(`/op/ha-topics`, {
+ method: 'POST',
+ body: JSON.stringify(params),
+ });
+};
+
+export const getAppRelatedTopics = (params: any) => {
+ return fetch(`/rd/apps/relate-topics
+ `, {
+ method: 'POST',
+ body: JSON.stringify(params),
+ });
+};
+// 取消Topic高可用
+export const unbindHaTopics = (params: any) => {
+ return fetch(`/op/ha-topics`, {
+ method: 'DELETE',
+ body: JSON.stringify(params),
+ });
+};
+
+// 创建Topic主备切换任务
+export const createSwitchTask = (params: any) => {
+ return fetch(`/op/as-switch-jobs`, {
+ method: 'POST',
+ body: JSON.stringify(params),
+ });
+};
+
+export const getJobDetail = (jobId: number) => {
+ return fetch(`/op/as-switch-jobs/${jobId}/job-detail`);
+};
+
+export const getJobLog = (jobId: number, startLogId?: number) => {
+ return fetch(`/op/as-switch-jobs/${jobId}/job-logs?startLogId=${startLogId || ''}`);
+};
+
+export const getJobState = (jobId: number) => {
+ return fetch(`/op/as-switch-jobs/${jobId}/job-state`);
+};
+
+export const switchAsJobs = (jobId: number, params: any) => {
+ return fetch(`/op/as-switch-jobs/${jobId}/action`, {
+ method: 'PUT',
+ body: JSON.stringify(params),
+ });
+};
+
export const getOperationRecordData = (params: any) => {
- return fetch(`/rd/operate-record`,{
+ return fetch(`/rd/operate-record`, {
method: 'POST',
body: JSON.stringify(params),
});
@@ -569,15 +634,15 @@ export const getCandidateController = (clusterId: number) => {
return fetch(`/rd/clusters/${clusterId}/controller-preferred-candidates`);
};
-export const addCandidateController = (params:any) => {
- return fetch(`/op/cluster-controller/preferred-candidates`, {
+export const addCandidateController = (params: any) => {
+ return fetch(`/op/cluster-controller/preferred-candidates`, {
method: 'POST',
body: JSON.stringify(params),
});
};
-export const deleteCandidateCancel = (params:any)=>{
- return fetch(`/op/cluster-controller/preferred-candidates`, {
+export const deleteCandidateCancel = (params: any) => {
+ return fetch(`/op/cluster-controller/preferred-candidates`, {
method: 'DELETE',
body: JSON.stringify(params),
});
diff --git a/kafka-manager-console/src/lib/fetch.ts b/kafka-manager-console/src/lib/fetch.ts
index ef307ccb..f51fd7d4 100644
--- a/kafka-manager-console/src/lib/fetch.ts
+++ b/kafka-manager-console/src/lib/fetch.ts
@@ -33,7 +33,6 @@ const checkStatus = (res: Response) => {
};
const filter = (init: IInit) => (res: IRes) => {
-
if (res.code !== 0 && res.code !== 200) {
if (!init.errorNoTips) {
notification.error({
@@ -117,7 +116,7 @@ export default function fetch(url: string, init?: IInit) {
export function formFetch(url: string, init?: IInit) {
url = url.indexOf('?') > 0 ?
- `${url}&dataCenter=${region.currentRegion}` : `${url}?dataCenter=${region.currentRegion}`;
+ `${url}&dataCenter=${region.currentRegion}` : `${url}?dataCenter=${region.currentRegion}`;
let realUrl = url;
if (!/^http(s)?:\/\//.test(url)) {
@@ -127,8 +126,8 @@ export function formFetch(url: string, init?: IInit) {
init = addCustomHeader(init);
return window
- .fetch(realUrl, init)
- .then(res => checkStatus(res))
- .then((res) => res.json())
- .then(filter(init));
+ .fetch(realUrl, init)
+ .then(res => checkStatus(res))
+ .then((res) => res.json())
+ .then(filter(init));
}
diff --git a/kafka-manager-console/src/routers/page/index.less b/kafka-manager-console/src/routers/page/index.less
index e4559814..21415a74 100644
--- a/kafka-manager-console/src/routers/page/index.less
+++ b/kafka-manager-console/src/routers/page/index.less
@@ -1,4 +1,3 @@
-
* {
padding: 0;
margin: 0;
@@ -13,7 +12,9 @@ li {
list-style-type: none;
}
-html, body, .router-nav {
+html,
+body,
+.router-nav {
width: 100%;
height: 100%;
font-family: PingFangSC-Regular;
@@ -52,11 +53,12 @@ html, body, .router-nav {
color: @primary-color;
}
-.ant-table-thead > tr > th, .ant-table-tbody > tr > td {
+.ant-table-thead>tr>th,
+.ant-table-tbody>tr>td {
padding: 13px;
}
-.ant-table-tbody > tr > td {
+.ant-table-tbody>tr>td {
background: #fff;
}
@@ -72,15 +74,11 @@ html, body, .router-nav {
overflow: auto;
}
-.ant-form-item {
- margin-bottom: 16px;
-}
-
.mb-24 {
margin-bottom: 24px;
}
-.ant-table-thead > tr > th .ant-table-filter-icon {
+.ant-table-thead>tr>th .ant-table-filter-icon {
right: initial;
}
@@ -100,7 +98,7 @@ html, body, .router-nav {
margin-left: 10px;
}
-.config-info{
+.config-info {
white-space: pre-line;
height: 100%;
overflow-y: scroll;
@@ -112,5 +110,4 @@ html, body, .router-nav {
margin-left: 10px;
cursor: pointer;
font-size: 12px;
-}
-
+}
\ No newline at end of file
diff --git a/kafka-manager-console/src/routers/router.tsx b/kafka-manager-console/src/routers/router.tsx
index 164eb370..192e55ef 100644
--- a/kafka-manager-console/src/routers/router.tsx
+++ b/kafka-manager-console/src/routers/router.tsx
@@ -1,6 +1,7 @@
import { BrowserRouter as Router, Route } from 'react-router-dom';
import { hot } from 'react-hot-loader/root';
import * as React from 'react';
+import zhCN from 'antd/lib/locale/zh_CN';
import Home from './page/topic';
import Admin from './page/admin';
@@ -12,58 +13,62 @@ import { urlPrefix } from 'constants/left-menu';
import ErrorPage from './page/error';
import Login from './page/login';
import InfoPage from './page/info';
+import { ConfigProvider } from 'antd';
class RouterDom extends React.Component {
public render() {
return (
-
-
-
-
+
-
-
+
+
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
+
+
-
-
-
-
+
+
+
+
+
+
+
+
);
}
}
diff --git a/kafka-manager-console/src/store/admin.ts b/kafka-manager-console/src/store/admin.ts
index 582950a3..c7957788 100644
--- a/kafka-manager-console/src/store/admin.ts
+++ b/kafka-manager-console/src/store/admin.ts
@@ -57,8 +57,9 @@ import {
getBillStaffDetail,
getCandidateController,
addCandidateController,
- deleteCandidateCancel
- } from 'lib/api';
+ deleteCandidateCancel,
+ getHaMetaData,
+} from 'lib/api';
import { getControlMetricOption, getClusterMetricOption } from 'lib/line-charts-config';
import { copyValueMap } from 'constants/status-map';
@@ -104,12 +105,15 @@ class Admin {
@observable
public metaList: IMetaData[] = [];
+ @observable
+ public haMetaList: IMetaData[] = [];
+
@observable
public oRList: any[] = [];
@observable
- public oRparams:any={
- moduleId:0
+ public oRparams: any = {
+ moduleId: 0
};
@observable
@@ -169,9 +173,9 @@ class Admin {
@observable
public controllerCandidate: IController[] = [];
- @observable
+ @observable
public filtercontrollerCandidate: string = '';
-
+
@observable
public brokersPartitions: IBrokersPartitions[] = [];
@@ -329,9 +333,20 @@ class Admin {
}
@action.bound
- public setOperationRecordList(data:any){
+ public setHaMetaList(data: IMetaData[]) {
this.setLoading(false);
- this.oRList = data ? data.map((item:any, index: any) => {
+ this.haMetaList = data ? data.map((item, index) => {
+ item.key = index;
+ return item;
+ }) : [];
+ this.haMetaList = this.haMetaList.sort((a, b) => a.clusterId - b.clusterId);
+ return this.haMetaList;
+ }
+
+ @action.bound
+ public setOperationRecordList(data: any) {
+ this.setLoading(false);
+ this.oRList = data ? data.map((item: any, index: any) => {
item.key = index;
return item;
}) : [];
@@ -394,9 +409,9 @@ class Admin {
item.key = index;
return item;
}) : [];
- this.filtercontrollerCandidate = data?data.map((item,index)=>{
+ this.filtercontrollerCandidate = data ? data.map((item, index) => {
return item.brokerId
- }).join(','):''
+ }).join(',') : ''
}
@action.bound
@@ -479,8 +494,8 @@ class Admin {
}
@action.bound
- public setBrokersMetadata(data: IBrokersMetadata[]|any) {
- this.brokersMetadata = data ? data.map((item:any, index:any) => {
+ public setBrokersMetadata(data: IBrokersMetadata[] | any) {
+ this.brokersMetadata = data ? data.map((item: any, index: any) => {
item.key = index;
return {
...item,
@@ -675,6 +690,11 @@ class Admin {
getMetaData(needDetail).then(this.setMetaList);
}
+ public getHaMetaData() {
+ this.setLoading(true);
+ return getHaMetaData().then(this.setHaMetaList);
+ }
+
public getOperationRecordData(params: any) {
this.setLoading(true);
this.oRparams = params
@@ -738,17 +758,17 @@ class Admin {
}
public getCandidateController(clusterId: number) {
- return getCandidateController(clusterId).then(data=>{
+ return getCandidateController(clusterId).then(data => {
return this.setCandidateController(data)
});
}
public addCandidateController(clusterId: number, brokerIdList: any) {
- return addCandidateController({clusterId, brokerIdList}).then(()=>this.getCandidateController(clusterId));
+ return addCandidateController({ clusterId, brokerIdList }).then(() => this.getCandidateController(clusterId));
}
- public deleteCandidateCancel(clusterId: number, brokerIdList: any){
- return deleteCandidateCancel({clusterId, brokerIdList}).then(()=>this.getCandidateController(clusterId));
+ public deleteCandidateCancel(clusterId: number, brokerIdList: any) {
+ return deleteCandidateCancel({ clusterId, brokerIdList }).then(() => this.getCandidateController(clusterId));
}
public getBrokersBasicInfo(clusterId: number, brokerId: number) {
diff --git a/kafka-manager-console/src/store/app.ts b/kafka-manager-console/src/store/app.ts
index a3af345f..a64c93a7 100644
--- a/kafka-manager-console/src/store/app.ts
+++ b/kafka-manager-console/src/store/app.ts
@@ -1,5 +1,5 @@
import { observable, action } from 'mobx';
-import { getAppList, getAppDetail, getAppTopicList, applyOrder, modfiyApplication, modfiyAdminApp, getAdminAppList, getAppsConnections, getTopicAppQuota } from 'lib/api';
+import { getAppList, getAppDetail, getAppTopicList, applyOrder, modfiyApplication, modfiyAdminApp, getAdminAppList, getAppsConnections, getTopicAppQuota, getAppListByClusterId } from 'lib/api';
import { IAppItem, IAppQuota, ITopic, IOrderParams, IConnectionInfo } from 'types/base-type';
class App {
@@ -12,6 +12,9 @@ class App {
@observable
public data: IAppItem[] = [];
+ @observable
+ public clusterAppData: IAppItem[] = [];
+
@observable
public adminAppData: IAppItem[] = [];
@@ -19,7 +22,7 @@ class App {
public selectData: IAppItem[] = [{
appId: '-1',
name: '所有关联应用',
- } as IAppItem,
+ } as IAppItem,
];
@observable
@@ -51,12 +54,12 @@ class App {
@action.bound
public setTopicAppQuota(data: IAppQuota[]) {
return this.appQuota = data.map((item, index) => {
- return {
- ...item,
- label: item.appName,
- value: item.appId,
- key: index,
- };
+ return {
+ ...item,
+ label: item.appName,
+ value: item.appId,
+ key: index,
+ };
});
}
@@ -87,6 +90,16 @@ class App {
this.setLoading(false);
}
+ @action.bound
+ public setClusterAppData(data: IAppItem[] = []) {
+ this.clusterAppData = data.map((item, index) => ({
+ ...item,
+ key: index,
+ principalList: item.principals ? item.principals.split(',') : [],
+ }));
+ return this.clusterAppData;
+ }
+
@action.bound
public setAdminData(data: IAppItem[] = []) {
this.adminAppData = data.map((item, index) => ({
@@ -133,6 +146,10 @@ class App {
getAppList().then(this.setData);
}
+ public getAppListByClusterId(clusterId: number) {
+ return getAppListByClusterId(clusterId).then(this.setClusterAppData);
+ }
+
public getTopicAppQuota(clusterId: number, topicName: string) {
return getTopicAppQuota(clusterId, topicName).then(this.setTopicAppQuota);
}
diff --git a/kafka-manager-console/src/store/topic.ts b/kafka-manager-console/src/store/topic.ts
index b47c1122..cacb7bf4 100644
--- a/kafka-manager-console/src/store/topic.ts
+++ b/kafka-manager-console/src/store/topic.ts
@@ -37,6 +37,7 @@ export interface ITopicBaseInfo {
physicalClusterId: number;
percentile: string;
regionNameList: any;
+ haRelation: number;
}
export interface IRealTimeTraffic {
diff --git a/kafka-manager-console/src/types/base-type.ts b/kafka-manager-console/src/types/base-type.ts
index f0858c3a..9f8f73c1 100644
--- a/kafka-manager-console/src/types/base-type.ts
+++ b/kafka-manager-console/src/types/base-type.ts
@@ -474,7 +474,14 @@ export interface IMetaData {
status: number;
topicNum: number;
zookeeper: string;
+ haRelation?: number;
+ haASSwitchJobId?: number;
+ haStatus?: number;
+ haClusterVO?: IMetaData;
+ activeTopicCount?: number;
+ standbyTopicCount?: number;
key?: number;
+ mutualBackupClusterName?: string;
}
export interface IConfigure {
@@ -641,6 +648,7 @@ export interface IClusterTopics {
properties: any;
clusterName: string;
logicalClusterId: number;
+ haRelation?: number;
key?: number;
}
diff --git a/kafka-manager-console/webpack.config.js b/kafka-manager-console/webpack.config.js
index d6d12fa8..1608de20 100644
--- a/kafka-manager-console/webpack.config.js
+++ b/kafka-manager-console/webpack.config.js
@@ -130,9 +130,7 @@ module.exports = {
historyApiFallback: true,
proxy: {
'/api/v1/': {
- // target: 'http://127.0.0.1:8080',
- target: 'http://10.179.37.199:8008',
- // target: 'http://99.11.45.164:8888',
+ target: 'http://127.0.0.1:8080/',
changeOrigin: true,
}
},
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaASRelationManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaASRelationManager.java
new file mode 100644
index 00000000..11c79127
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaASRelationManager.java
@@ -0,0 +1,32 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha;
+
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
+import com.xiaojukeji.kafka.manager.common.entity.vo.ha.HaClusterTopicVO;
+import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.HaClusterTopicHaStatusVO;
+
+import java.util.List;
+
+public interface HaASRelationManager {
+ /**
+ * 获取集群主备信息
+ */
+ List getHATopics(Long firstClusterPhyId, Long secondClusterPhyId, boolean filterSystemTopics);
+
+ /**
+ * 获取集群Topic的主备状态信息
+ */
+ Result> listHaStatusTopics(Long clusterPhyId, Boolean checkMetadata);
+
+
+ /**
+ * 获取获取集群topic高可用关系 0:备topic, 1:主topic, -1非高可用
+ */
+ Integer getRelation(Long clusterId, String topicName);
+
+ /**
+ * 获取获取集群topic高可用关系
+ */
+ HaASRelationDO getASRelation(Long clusterId, String topicName);
+
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java
new file mode 100644
index 00000000..c1a480a5
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaAppManager.java
@@ -0,0 +1,16 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha;
+
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
+
+import java.util.List;
+
+
+/**
+ * Ha App管理
+ */
+public interface HaAppManager {
+ Result> appRelateTopics(Long clusterPhyId, List filterTopicNameList);
+
+ boolean isContainAllRelateAppTopics(Long clusterPhyId, List filterTopicNameList);
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaClusterManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaClusterManager.java
new file mode 100644
index 00000000..7b25e2c0
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaClusterManager.java
@@ -0,0 +1,19 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha;
+
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.ao.ClusterDetailDTO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+
+import java.util.List;
+
+/**
+ * Ha Cluster管理
+ */
+public interface HaClusterManager {
+ List getClusterDetailDTOList(Boolean needDetail);
+
+ Result addNew(ClusterDO clusterDO, Long activeClusterId, String operator);
+
+ Result deleteById(Long clusterId, String operator);
+
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java
new file mode 100644
index 00000000..b9755e55
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/HaTopicManager.java
@@ -0,0 +1,44 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha;
+
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
+import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic;
+import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
+
+import java.util.List;
+
+
+/**
+ * Ha Topic管理
+ */
+public interface HaTopicManager {
+ /**
+ * 批量更改主备关系
+ */
+ Result> batchCreateHaTopic(HaTopicRelationDTO dto, String operator);
+
+ /**
+ * 批量更改主备关系
+ */
+ Result> batchRemoveHaTopic(HaTopicRelationDTO dto, String operator);
+
+ /**
+ * 可重试的执行主备切换
+ * @param newActiveClusterPhyId 主集群
+ * @param newStandbyClusterPhyId 备集群
+ * @param switchTopicNameList 切换的Topic列表
+ * @param focus 强制切换
+ * @param firstTriggerExecute 第一次触发执行
+ * @param switchLogTemplate 切换日志模版
+ * @param operator 操作人
+ * @return 操作结果
+ */
+ Result switchHaWithCanRetry(Long newActiveClusterPhyId,
+ Long newStandbyClusterPhyId,
+ List switchTopicNameList,
+ boolean focus,
+ boolean firstTriggerExecute,
+ JobLogDO switchLogTemplate,
+ String operator);
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaASRelationManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaASRelationManagerImpl.java
new file mode 100644
index 00000000..306671a0
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaASRelationManagerImpl.java
@@ -0,0 +1,140 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha.impl;
+
+import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
+import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaRelationTypeEnum;
+import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
+import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
+import com.xiaojukeji.kafka.manager.common.entity.vo.ha.HaClusterTopicVO;
+import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.HaClusterTopicHaStatusVO;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.service.biz.ha.HaASRelationManager;
+import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
+import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
+import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaTopicService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Service
+public class HaASRelationManagerImpl implements HaASRelationManager {
+ @Autowired
+ private HaASRelationService haASRelationService;
+
+ @Autowired
+ private TopicManagerService topicManagerService;
+
+ @Autowired
+ private HaTopicService haTopicService;
+
+ @Autowired
+ private AuthorityService authorityService;
+
+ @Override
+ public List getHATopics(Long firstClusterPhyId, Long secondClusterPhyId, boolean filterSystemTopics) {
+ List doList = haASRelationService.listAllHAFromDB(firstClusterPhyId, secondClusterPhyId, HaResTypeEnum.TOPIC);
+ if (ValidateUtils.isEmptyList(doList)) {
+ return new ArrayList<>();
+ }
+
+ List voList = new ArrayList<>();
+ for (HaASRelationDO relationDO: doList) {
+ if (filterSystemTopics
+ && (relationDO.getActiveResName().startsWith("__") || relationDO.getStandbyResName().startsWith("__"))) {
+ // 过滤掉系统Topic && 存在系统Topic,则过滤掉
+ continue;
+ }
+
+ HaClusterTopicVO vo = new HaClusterTopicVO();
+ vo.setClusterId(firstClusterPhyId);
+ if (firstClusterPhyId.equals(relationDO.getActiveClusterPhyId())) {
+ vo.setTopicName(relationDO.getActiveResName());
+ } else {
+ vo.setTopicName(relationDO.getStandbyResName());
+ }
+
+ vo.setProduceAclNum(0);
+ vo.setConsumeAclNum(0);
+ vo.setActiveClusterId(relationDO.getActiveClusterPhyId());
+ vo.setStandbyClusterId(relationDO.getStandbyClusterPhyId());
+ vo.setStatus(relationDO.getStatus());
+
+ // 补充ACL信息
+ List authorityDOList = authorityService.getAuthorityByTopicFromCache(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName());
+ authorityDOList.forEach(elem -> {
+ if ((elem.getAccess() & TopicAuthorityEnum.WRITE.getCode()) > 0) {
+ vo.setProduceAclNum(vo.getProduceAclNum() + 1);
+ }
+ if ((elem.getAccess() & TopicAuthorityEnum.READ.getCode()) > 0) {
+ vo.setConsumeAclNum(vo.getConsumeAclNum() + 1);
+ }
+ });
+
+ voList.add(vo);
+ }
+
+ return voList;
+ }
+
+ @Override
+ public Result> listHaStatusTopics(Long clusterPhyId, Boolean checkMetadata) {
+ ClusterDO clusterDO = PhysicalClusterMetadataManager.getClusterFromCache(clusterPhyId);
+ if (clusterDO == null){
+ return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
+ }
+ List topicDOS = topicManagerService.getByClusterId(clusterPhyId);
+ if (ValidateUtils.isEmptyList(topicDOS)) {
+ return Result.buildSuc(new ArrayList<>());
+ }
+
+ Map haRelationMap = haTopicService.getRelation(clusterPhyId);
+ List statusVOS = new ArrayList<>();
+ topicDOS.stream().filter(topicDO -> !topicDO.getTopicName().startsWith("__"))//过滤引擎自带topic
+ .forEach(topicDO -> {
+ if(checkMetadata && !PhysicalClusterMetadataManager.isTopicExist(clusterPhyId, topicDO.getTopicName())){
+ return;
+ }
+ HaClusterTopicHaStatusVO statusVO = new HaClusterTopicHaStatusVO();
+ statusVO.setClusterId(clusterPhyId);
+ statusVO.setClusterName(clusterDO.getClusterName());
+ statusVO.setTopicName(topicDO.getTopicName());
+ statusVO.setHaRelation(haRelationMap.get(topicDO.getTopicName()));
+ statusVOS.add(statusVO);
+ });
+
+ return Result.buildSuc(statusVOS);
+ }
+
+ @Override
+ public Integer getRelation(Long clusterId, String topicName) {
+ HaASRelationDO relationDO = haASRelationService.getHAFromDB(clusterId, topicName, HaResTypeEnum.TOPIC);
+ if (relationDO == null){
+ return HaRelationTypeEnum.UNKNOWN.getCode();
+ }
+ if (topicName.equals(KafkaConstant.COORDINATOR_TOPIC_NAME)){
+ return HaRelationTypeEnum.MUTUAL_BACKUP.getCode();
+ }
+ if (clusterId.equals(relationDO.getActiveClusterPhyId())){
+ return HaRelationTypeEnum.ACTIVE.getCode();
+ }
+ if (clusterId.equals(relationDO.getStandbyClusterPhyId())){
+ return HaRelationTypeEnum.STANDBY.getCode();
+ }
+ return HaRelationTypeEnum.UNKNOWN.getCode();
+ }
+
+ @Override
+ public HaASRelationDO getASRelation(Long clusterId, String topicName) {
+ return haASRelationService.getHAFromDB(clusterId, topicName, HaResTypeEnum.TOPIC);
+ }
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java
new file mode 100644
index 00000000..19ffc5ae
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaAppManagerImpl.java
@@ -0,0 +1,94 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha.impl;
+
+import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
+import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager;
+import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+
+@Service
+public class HaAppManagerImpl implements HaAppManager {
+
+ @Autowired
+ private AuthorityService authorityService;
+
+ @Autowired
+ private HaASRelationService haASRelationService;
+
+ @Override
+ public Result> appRelateTopics(Long clusterPhyId, List filterTopicNameList) {
+ // 获取关联的Topic列表
+ Map> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList);
+
+ // 获取集群已建立HA的Topic列表
+ Set haTopicNameSet = haASRelationService.listAllHAFromDB(clusterPhyId, HaResTypeEnum.TOPIC)
+ .stream()
+ .map(elem -> elem.getActiveResName())
+ .collect(Collectors.toSet());
+
+ Set filterTopicNameSet = new HashSet<>(filterTopicNameList);
+
+ List voList = new ArrayList<>();
+ for (Map.Entry> entry: userTopicMap.entrySet()) {
+ AppRelateTopicsVO vo = new AppRelateTopicsVO();
+ vo.setClusterPhyId(clusterPhyId);
+ vo.setKafkaUser(entry.getKey());
+ vo.setSelectedTopicNameList(new ArrayList<>());
+ vo.setNotSelectTopicNameList(new ArrayList<>());
+ vo.setNotHaTopicNameList(new ArrayList<>());
+ entry.getValue().forEach(elem -> {
+ if (elem.startsWith("__")) {
+ // ignore
+ return;
+ }
+
+ if (!haTopicNameSet.contains(elem)) {
+ vo.getNotHaTopicNameList().add(elem);
+ } else if (filterTopicNameSet.contains(elem)) {
+ vo.getSelectedTopicNameList().add(elem);
+ } else {
+ vo.getNotSelectTopicNameList().add(elem);
+ }
+ });
+
+ voList.add(vo);
+ }
+
+ return Result.buildSuc(voList);
+ }
+
+ @Override
+ public boolean isContainAllRelateAppTopics(Long clusterPhyId, List filterTopicNameList) {
+ Map> userTopicMap = this.appRelateTopicsMap(clusterPhyId, filterTopicNameList);
+
+ Set relateTopicSet = new HashSet<>();
+ userTopicMap.values().forEach(elem -> relateTopicSet.addAll(elem));
+
+ return filterTopicNameList.containsAll(relateTopicSet);
+ }
+
+ private Map> appRelateTopicsMap(Long clusterPhyId, List filterTopicNameList) {
+ Map> userTopicMap = new HashMap<>();
+ for (String topicName: filterTopicNameList) {
+ authorityService.getAuthorityByTopicFromCache(clusterPhyId, topicName)
+ .stream()
+ .map(elem -> elem.getAppId())
+ .filter(item -> !userTopicMap.containsKey(item))
+ .forEach(kafkaUser ->
+ userTopicMap.put(
+ kafkaUser,
+ authorityService.getAuthority(kafkaUser).stream().map(authorityDO -> authorityDO.getTopicName()).collect(Collectors.toSet())
+ )
+ );
+ }
+
+ return userTopicMap;
+ }
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaClusterManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaClusterManagerImpl.java
new file mode 100644
index 00000000..debc3d96
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaClusterManagerImpl.java
@@ -0,0 +1,169 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha.impl;
+
+import com.xiaojukeji.kafka.manager.common.bizenum.ClusterModeEnum;
+import com.xiaojukeji.kafka.manager.common.bizenum.DBStatusEnum;
+import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
+import com.xiaojukeji.kafka.manager.common.constant.MsgConstant;
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
+import com.xiaojukeji.kafka.manager.common.entity.ao.ClusterDetailDTO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
+import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
+import com.xiaojukeji.kafka.manager.service.biz.ha.HaClusterManager;
+import com.xiaojukeji.kafka.manager.service.service.ClusterService;
+import com.xiaojukeji.kafka.manager.service.service.LogicalClusterService;
+import com.xiaojukeji.kafka.manager.service.service.RegionService;
+import com.xiaojukeji.kafka.manager.service.service.ZookeeperService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaClusterService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.interceptor.TransactionAspectSupport;
+
+import java.util.List;
+
+@Component
+public class HaClusterManagerImpl implements HaClusterManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HaClusterManagerImpl.class);
+
+ @Autowired
+ private ClusterService clusterService;
+
+ @Autowired
+ private HaClusterService haClusterService;
+
+ @Autowired
+ private ZookeeperService zookeeperService;
+
+ @Autowired
+ private LogicalClusterService logicalClusterService;
+
+ @Autowired
+ private RegionService regionService;
+
+ @Autowired
+ private HaASRelationService haASRelationService;
+
+ @Override
+ public List getClusterDetailDTOList(Boolean needDetail) {
+ return clusterService.getClusterDetailDTOList(needDetail);
+ }
+
+ @Override
+ @Transactional
+ public Result addNew(ClusterDO clusterDO, Long activeClusterId, String operator) {
+ if (activeClusterId == null) {
+ // 普通集群,直接写入DB
+ Long clusterPhyId = zookeeperService.getClusterIdAndNullIfFailed(clusterDO.getZookeeper());
+ if (clusterPhyId != null && clusterService.getById(clusterPhyId) == null) {
+ // 该集群ID不存在时,则进行设置,如果已经存在了,则忽略
+ clusterDO.setId(clusterPhyId);
+ }
+
+ return Result.buildFrom(clusterService.addNew(clusterDO, operator));
+ }
+
+ //高可用集群
+ ClusterDO activeClusterDO = clusterService.getById(activeClusterId);
+ if (activeClusterDO == null) {
+ // 主集群不存在
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_NOT_EXIST, MsgConstant.getClusterPhyNotExist(activeClusterId));
+ }
+
+ HaASRelationDO oldRelationDO = haClusterService.getHA(activeClusterId);
+ if (oldRelationDO != null){
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_ALREADY_USED,
+ MsgConstant.getActiveClusterDuplicate(activeClusterDO.getId(), activeClusterDO.getClusterName()));
+ }
+
+ Long standbyClusterPhyId = zookeeperService.getClusterIdAndNullIfFailed(clusterDO.getZookeeper());
+ if (standbyClusterPhyId != null && clusterService.getById(standbyClusterPhyId) == null) {
+ // 该集群ID不存在时,则进行设置,如果已经存在了,则忽略
+ clusterDO.setId(standbyClusterPhyId);
+ }
+
+ ResultStatus rs = clusterService.addNew(clusterDO, operator);
+ if (!ResultStatus.SUCCESS.equals(rs)) {
+ return Result.buildFrom(rs);
+ }
+
+ Result> rli = zookeeperService.getBrokerIds(clusterDO.getZookeeper());
+ if (!rli.hasData()){
+ return Result.buildFrom(ResultStatus.BROKER_NOT_EXIST);
+ }
+
+ // 备集群创建region
+ RegionDO regionDO = new RegionDO(DBStatusEnum.ALIVE.getStatus(), clusterDO.getClusterName(), clusterDO.getId(), ListUtils.intList2String(rli.getData()));
+ rs = regionService.createRegion(regionDO);
+ if (!ResultStatus.SUCCESS.equals(rs)){
+ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
+
+ return Result.buildFrom(rs);
+ }
+
+ // 备集群创建逻辑集群
+ List logicalClusterDOS = logicalClusterService.getByPhysicalClusterId(activeClusterId);
+ if (!logicalClusterDOS.isEmpty()) {
+ // 有逻辑集群,则对应创建逻辑集群
+ Integer mode = logicalClusterDOS.get(0).getMode();
+ LogicalClusterDO logicalClusterDO = new LogicalClusterDO(
+ clusterDO.getClusterName(),
+ clusterDO.getClusterName(),
+ ClusterModeEnum.INDEPENDENT_MODE.getCode().equals(mode)?mode:ClusterModeEnum.SHARED_MODE.getCode(),
+ ClusterModeEnum.INDEPENDENT_MODE.getCode().equals(mode)?logicalClusterDOS.get(0).getAppId(): "",
+ clusterDO.getId(),
+ regionDO.getId().toString()
+ );
+ ResultStatus clcRS = logicalClusterService.createLogicalCluster(logicalClusterDO);
+ if (clcRS.getCode() != ResultStatus.SUCCESS.getCode()){
+ TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
+ return Result.buildFrom(clcRS);
+ }
+ }
+
+ return haClusterService.createHA(activeClusterId, clusterDO.getId(), operator);
+ }
+
+ @Override
+ @Transactional
+ public Result deleteById(Long clusterId, String operator) {
+ HaASRelationDO haRelationDO = haClusterService.getHA(clusterId);
+ if (haRelationDO == null){
+ return clusterService.deleteById(clusterId, operator);
+ }
+
+ Result rv = checkForDelete(haRelationDO, clusterId);
+ if (rv.failed()){
+ return rv;
+ }
+
+ //解除高可用关系
+ Result result = haClusterService.deleteHA(haRelationDO.getActiveClusterPhyId(), haRelationDO.getStandbyClusterPhyId());
+ if (result.failed()){
+ return result;
+ }
+
+ //删除集群
+ result = clusterService.deleteById(clusterId, operator);
+ if (result.failed()){
+ return result;
+ }
+ return Result.buildSuc();
+ }
+
+ private Result checkForDelete(HaASRelationDO haRelationDO, Long clusterId){
+ List relationDOS = haASRelationService.listAllHAFromDB(haRelationDO.getActiveClusterPhyId(),
+ haRelationDO.getStandbyClusterPhyId(),
+ HaResTypeEnum.TOPIC);
+ if (relationDOS.stream().filter(relationDO -> !relationDO.getActiveResName().startsWith("__")).count() > 0){
+ return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "集群还存在高可topic");
+ }
+ return Result.buildSuc();
+ }
+}
diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java
new file mode 100644
index 00000000..d1224a4b
--- /dev/null
+++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/biz/ha/impl/HaTopicManagerImpl.java
@@ -0,0 +1,559 @@
+package com.xiaojukeji.kafka.manager.service.biz.ha.impl;
+
+import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
+import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaStatusEnum;
+import com.xiaojukeji.kafka.manager.common.constant.MsgConstant;
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
+import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
+import com.xiaojukeji.kafka.manager.common.entity.ao.ha.HaSwitchTopic;
+import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
+import com.xiaojukeji.kafka.manager.common.utils.BackoffUtils;
+import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager;
+import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
+import com.xiaojukeji.kafka.manager.service.service.ClusterService;
+import com.xiaojukeji.kafka.manager.service.service.JobLogService;
+import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
+import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaKafkaUserService;
+import com.xiaojukeji.kafka.manager.service.service.ha.HaTopicService;
+import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Component
+public class HaTopicManagerImpl implements HaTopicManager {
+ private static final Logger LOGGER = LoggerFactory.getLogger(HaTopicManagerImpl.class);
+
+ @Autowired
+ private ClusterService clusterService;
+
+ @Autowired
+ private AuthorityService authorityService;
+
+ @Autowired
+ private HaTopicService haTopicService;
+
+ @Autowired
+ private HaKafkaUserService haKafkaUserService;
+
+ @Autowired
+ private HaASRelationService haASRelationService;
+
+ @Autowired
+ private TopicManagerService topicManagerService;
+
+ @Autowired
+ private ConfigUtils configUtils;
+
+ @Autowired
+ private JobLogService jobLogService;
+
+ @Override
+ public Result switchHaWithCanRetry(Long newActiveClusterPhyId,
+ Long newStandbyClusterPhyId,
+ List switchTopicNameList,
+ boolean focus,
+ boolean firstTriggerExecute,
+ JobLogDO switchLogTemplate,
+ String operator) {
+ LOGGER.info(
+ "method=switchHaWithCanRetry||newActiveClusterPhyId={}||newStandbyClusterPhyId={}||switchTopicNameList={}||focus={}||operator={}",
+ newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(switchTopicNameList), focus, operator
+ );
+
+ // 1、获取集群
+ ClusterDO newActiveClusterPhyDO = clusterService.getById(newActiveClusterPhyId);
+ if (ValidateUtils.isNull(newActiveClusterPhyDO)) {
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_NOT_EXIST, MsgConstant.getClusterPhyNotExist(newActiveClusterPhyId));
+ }
+
+ ClusterDO newStandbyClusterPhyDO = clusterService.getById(newStandbyClusterPhyId);
+ if (ValidateUtils.isNull(newStandbyClusterPhyDO)) {
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_NOT_EXIST, MsgConstant.getClusterPhyNotExist(newStandbyClusterPhyId));
+ }
+
+ // 2、进行参数检查
+ Result> doListResult = this.checkParamAndGetASRelation(newActiveClusterPhyId, newStandbyClusterPhyId, switchTopicNameList);
+ if (doListResult.failed()) {
+ LOGGER.error(
+ "method=switchHaWithCanRetry||newActiveClusterPhyId={}||newStandbyClusterPhyId={}||switchTopicNameList={}||paramErrResult={}||operator={}",
+ newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(switchTopicNameList), doListResult, operator
+ );
+
+ return Result.buildFromIgnoreData(doListResult);
+ }
+ List doList = doListResult.getData();
+
+ // 3、如果是第一次触发执行,且状态是stable,则修改状态
+ for (HaASRelationDO relationDO: doList) {
+ if (firstTriggerExecute && relationDO.getStatus().equals(HaStatusEnum.STABLE_CODE)) {
+ relationDO.setStatus(HaStatusEnum.SWITCHING_PREPARE_CODE);
+ haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_PREPARE_CODE);
+ }
+ }
+
+ // 4、进行切换预处理
+ HaSwitchTopic switchTopic = this.prepareSwitching(newStandbyClusterPhyDO, doList, focus, switchLogTemplate);
+
+ // 5、直接等待10秒,使得相关数据有机会同步完成
+ BackoffUtils.backoff(10000);
+
+ // 6、检查数据同步情况
+ for (HaASRelationDO relationDO: doList) {
+ switchTopic.addHaSwitchTopic(this.checkTopicInSync(newActiveClusterPhyDO, newStandbyClusterPhyDO, relationDO, focus, switchLogTemplate));
+ }
+
+ // 7、删除旧的备Topic的同步配置
+ for (HaASRelationDO relationDO: doList) {
+ switchTopic.addHaSwitchTopic(this.oldStandbyTopicDelFetchConfig(newActiveClusterPhyDO, newStandbyClusterPhyDO, relationDO, focus, switchLogTemplate, operator));
+ }
+
+ // 8、增加新的备Topic的同步配置,
+ switchTopic.addHaSwitchTopic(this.newStandbyTopicAddFetchConfig(newActiveClusterPhyDO, newStandbyClusterPhyDO, doList, focus, switchLogTemplate, operator));
+
+ // 9、进行切换收尾
+ switchTopic.addHaSwitchTopic(this.closeoutSwitching(newActiveClusterPhyDO, newStandbyClusterPhyDO, configUtils.getDKafkaGatewayZK(), doList, focus, switchLogTemplate));
+
+ // 10、状态结果汇总记录
+ doList.forEach(elem -> switchTopic.addActiveTopicStatus(elem.getActiveResName(), elem.getStatus()));
+
+ // 11、日志记录并返回
+ LOGGER.info(
+ "method=switchHaWithCanRetry||newActiveClusterPhyId={}||newStandbyClusterPhyId={}||switchTopicNameList={}||switchResult={}||operator={}",
+ newActiveClusterPhyId, newStandbyClusterPhyId, ConvertUtil.obj2Json(switchTopicNameList), switchTopic, operator
+ );
+
+ return Result.buildSuc(switchTopic);
+ }
+
+ @Override
+ public Result> batchCreateHaTopic(HaTopicRelationDTO dto, String operator) {
+ List relationDOS = haASRelationService.listAllHAFromDB(dto.getActiveClusterId(), dto.getStandbyClusterId(), HaResTypeEnum.CLUSTER);
+ if (relationDOS.isEmpty()){
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_NOT_EXIST, "集群高可用关系未建立");
+ }
+
+ //获取主集群已有的高可用topic
+ Map haRelationMap = haTopicService.getRelation(dto.getActiveClusterId());
+ List topicNames = dto.getTopicNames();
+ if (dto.getAll()){
+ topicNames = topicManagerService.getByClusterId(dto.getActiveClusterId())
+ .stream()
+ .filter(topicDO -> !topicDO.getTopicName().startsWith("__"))//过滤掉kafka自带topic
+ .filter(topicDO -> !haRelationMap.keySet().contains(topicDO.getTopicName()))//过滤调已成为高可用topic的topic
+ .filter(topicDO -> PhysicalClusterMetadataManager.isTopicExist(dto.getActiveClusterId(), topicDO.getTopicName()))
+ .map(TopicDO::getTopicName)
+ .collect(Collectors.toList());
+
+ }
+
+ List operationResultList = new ArrayList<>();
+ topicNames.forEach(topicName->{
+ Result rv = haTopicService.createHA(dto.getActiveClusterId(), dto.getStandbyClusterId(),topicName, operator);
+ operationResultList.add(TopicOperationResult.buildFrom(dto.getActiveClusterId(), topicName, rv));
+ });
+
+ return Result.buildSuc(operationResultList);
+ }
+
+ @Override
+ public Result> batchRemoveHaTopic(HaTopicRelationDTO dto, String operator) {
+ List relationDOS = haASRelationService.listAllHAFromDB(dto.getActiveClusterId(), dto.getStandbyClusterId(), HaResTypeEnum.CLUSTER);
+ if (relationDOS.isEmpty()){
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_NOT_EXIST, "集群高可用关系未建立");
+ }
+
+ List operationResultList = new ArrayList<>();
+ for(String topicName : dto.getTopicNames()){
+ HaASRelationDO relationDO = haASRelationService.getHAFromDB(
+ dto.getActiveClusterId(),
+ topicName,
+ HaResTypeEnum.TOPIC
+ );
+ if (relationDO == null) {
+ return Result.buildFromRSAndMsg(ResultStatus.RESOURCE_NOT_EXIST, "主备关系不存在");
+ }
+
+ Result rv = haTopicService.deleteHA(relationDO.getActiveClusterPhyId(), relationDO.getStandbyClusterPhyId(), topicName, operator);
+ operationResultList.add(TopicOperationResult.buildFrom(dto.getActiveClusterId(), topicName, rv));
+ }
+
+ return Result.buildSuc(operationResultList);
+ }
+
+ /**************************************************** private method ****************************************************/
+
+ private void saveLogs(JobLogDO switchLogTemplate, String content) {
+ jobLogService.addLogAndIgnoreException(switchLogTemplate.setAndCopyNew(new Date(), content));
+ }
+
+ /**
+ * 切换预处理
+ * 1、在主集群上,将Topic关联的KafkaUser的active集群设置为None
+ */
+ private HaSwitchTopic prepareSwitching(ClusterDO oldActiveClusterPhyDO, List doList, boolean focus, JobLogDO switchLogTemplate) {
+ // 暂停HA的KafkaUser
+ Set stoppedHaKafkaUserSet = new HashSet<>();
+
+ HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
+
+ boolean allSuccess = true; // 所有都成功
+ boolean needLog = false; // 需要记录日志
+ for (HaASRelationDO relationDO: doList) {
+ if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_PREPARE_CODE)) {
+ // 当前不处于prepare状态
+ haSwitchTopic.setFinished(true);
+ continue;
+ }
+ needLog = true;
+
+ // 获取关联的KafkaUser
+ Set relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName())
+ .stream()
+ .map(elem -> elem.getAppId())
+ .filter(kafkaUser -> !stoppedHaKafkaUserSet.contains(kafkaUser))
+ .collect(Collectors.toSet());
+
+ // 暂停kafkaUser HA
+ for (String kafkaUser: relatedKafkaUserSet) {
+ Result rv = haKafkaUserService.setNoneHAInKafka(oldActiveClusterPhyDO.getZookeeper(), kafkaUser);
+ if (rv.failed() && !focus) {
+ haSwitchTopic.setFinished(false);
+
+ this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName())));
+ return haSwitchTopic;
+ } else if (rv.failed() && focus) {
+ allSuccess = false;
+ }
+ }
+
+ // 记录操作过的user
+ stoppedHaKafkaUserSet.addAll(relatedKafkaUserSet);
+
+ // 修改Topic主备状态
+ relationDO.setStatus(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
+ haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE);
+ }
+
+ if (needLog) {
+ this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_PREPARE.getMsg(oldActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
+ }
+
+ haSwitchTopic.setFinished(true);
+ return haSwitchTopic;
+ }
+
+ /**
+ * 等待主备Topic同步
+ */
+ private HaSwitchTopic checkTopicInSync(ClusterDO newActiveClusterPhyDO, ClusterDO newStandbyClusterPhyDO, HaASRelationDO relationDO, boolean focus, JobLogDO switchLogTemplate) {
+ HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
+ if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_WAITING_IN_SYNC_CODE)) {
+ // 状态错误,直接略过
+ haSwitchTopic.setFinished(true);
+ return haSwitchTopic;
+ }
+
+ if (focus) {
+ // 无需等待inSync
+
+ // 修改Topic主备状态
+ relationDO.setStatus(HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH_CODE);
+ haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH_CODE);
+
+ haSwitchTopic.setFinished(true);
+ this.saveLogs(switchLogTemplate, String.format(
+ "%s:\tTopic:[%s] 强制切换,跳过等待主备同步完成,直接进入下一步",
+ HaStatusEnum.SWITCHING_WAITING_IN_SYNC.getMsg(newActiveClusterPhyDO.getClusterName()),
+ relationDO.getActiveResName()
+ ));
+ return haSwitchTopic;
+ }
+
+ Result lagResult = haTopicService.getStandbyTopicFetchLag(newStandbyClusterPhyDO.getId(), relationDO.getStandbyResName());
+ if (lagResult.failed()) {
+ // 获取Lag信息失败
+ this.saveLogs(switchLogTemplate, String.format(
+ "%s:\tTopic:[%s] 获取同步的Lag信息失败,1分钟后再检查是否主备同步完成",
+ HaStatusEnum.SWITCHING_WAITING_IN_SYNC.getMsg(newActiveClusterPhyDO.getClusterName()),
+ relationDO.getActiveResName()
+ ));
+ haSwitchTopic.setFinished(false);
+ return haSwitchTopic;
+ }
+
+ if (lagResult.getData().longValue() > 0) {
+ this.saveLogs(switchLogTemplate, String.format(
+ "%s:\tTopic:[%s] 还存在 %d 条数据未同步完成,1分钟后再检查是否主备同步完成",
+ HaStatusEnum.SWITCHING_WAITING_IN_SYNC.getMsg(newActiveClusterPhyDO.getClusterName()),
+ relationDO.getActiveResName(),
+ lagResult.getData()
+ ));
+
+ haSwitchTopic.setFinished(false);
+ return haSwitchTopic;
+ }
+
+ // 修改Topic主备状态
+ relationDO.setStatus(HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH_CODE);
+ haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH_CODE);
+
+ haSwitchTopic.setFinished(true);
+ this.saveLogs(switchLogTemplate, String.format(
+ "%s:\tTopic:[%s] 主备同步完成",
+ HaStatusEnum.SWITCHING_WAITING_IN_SYNC.getMsg(newActiveClusterPhyDO.getClusterName()),
+ relationDO.getActiveResName()
+ ));
+ return haSwitchTopic;
+ }
+
+ /**
+ * 备Topic删除拉取主Topic数据的配置
+ */
+ private HaSwitchTopic oldStandbyTopicDelFetchConfig(ClusterDO newActiveClusterPhyDO, ClusterDO newStandbyClusterPhyDO, HaASRelationDO relationDO, boolean focus, JobLogDO switchLogTemplate, String operator) {
+ HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
+ if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH_CODE)) {
+ // 状态不对
+ haSwitchTopic.setFinished(true);
+ return haSwitchTopic;
+ }
+
+ Result rv = haTopicService.stopHAInKafka(
+ newActiveClusterPhyDO, relationDO.getStandbyResName(), // 旧的备
+ operator
+ );
+ if (rv.failed() && !focus) {
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH.getMsg(newActiveClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+ haSwitchTopic.setFinished(false);
+ return haSwitchTopic;
+ } else if (rv.failed() && focus) {
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 失败,但进行强制执行,跳过该操作", HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH.getMsg(newActiveClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+ } else {
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 成功", HaStatusEnum.SWITCHING_CLOSE_OLD_STANDBY_TOPIC_FETCH.getMsg(newActiveClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+ }
+
+ // 修改Topic主备状态
+ relationDO.setStatus(HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH_CODE);
+ haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH_CODE);
+
+ haSwitchTopic.setFinished(true);
+ return haSwitchTopic;
+ }
+
+ /**
+ * 新的备Topic,创建拉取新主Topic数据的配置
+ */
+ private HaSwitchTopic newStandbyTopicAddFetchConfig(ClusterDO newActiveClusterPhyDO,
+ ClusterDO newStandbyClusterPhyDO,
+ List doList,
+ boolean focus,
+ JobLogDO switchLogTemplate,
+ String operator) {
+ boolean forceAndFailed = false;
+ for (HaASRelationDO relationDO: doList) {
+ if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH_CODE)) {
+ // 状态不对
+ continue;
+ }
+
+ Result rv = null;
+ if (!forceAndFailed) {
+ // 非 强制切换并且失败了
+ rv = haTopicService.activeHAInKafka(
+ newActiveClusterPhyDO, relationDO.getStandbyResName(),
+ newStandbyClusterPhyDO, relationDO.getStandbyResName(),
+ operator
+ );
+ }
+
+ if (forceAndFailed) {
+ // 强制切换并且失败了,记录该日志
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 失败,但因为是强制执行且强制执行时依旧出现操作失败,因此直接跳过该操作", HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH.getMsg(newStandbyClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+
+ } else if (rv.failed() && !focus) {
+ // 如果失败了,并且非强制切换,则直接返回
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH.getMsg(newStandbyClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+
+ return new HaSwitchTopic(false);
+ } else if (rv.failed() && focus) {
+ // 如果失败了,但是是强制切换,则记录日志并继续
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 失败,但因为是强制执行,因此跳过该操作", HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH.getMsg(newStandbyClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+
+ forceAndFailed = true;
+ } else {
+ // 记录成功日志
+ this.saveLogs(switchLogTemplate, String.format("%s:\tTopic:[%s] 成功", HaStatusEnum.SWITCHING_OPEN_NEW_STANDBY_TOPIC_FETCH.getMsg(newStandbyClusterPhyDO.getClusterName()), relationDO.getActiveResName()));
+ }
+
+ // 修改Topic主备状态
+ relationDO.setStatus(HaStatusEnum.SWITCHING_CLOSEOUT_CODE);
+ haASRelationService.updateRelationStatus(relationDO.getId(), HaStatusEnum.SWITCHING_CLOSEOUT_CODE);
+ }
+
+ return new HaSwitchTopic(true);
+ }
+
+ /**
+ * 切换收尾
+ * 1、原先的主集群-修改user的active集群,指向新的主集群
+ * 2、原先的备集群-修改user的active集群,指向新的主集群
+ * 3、网关-修改user的active集群,指向新的主集群
+ */
+ private HaSwitchTopic closeoutSwitching(ClusterDO newActiveClusterPhyDO, ClusterDO newStandbyClusterPhyDO, String gatewayZK, List doList, boolean focus, JobLogDO switchLogTemplate) {
+ // 暂停HA的KafkaUser
+ Set activeHaKafkaUserSet = new HashSet<>();
+
+ boolean allSuccess = true;
+ boolean needLog = false;
+ boolean forceAndNewStandbyFailed = false; // 强制切换,但是新的备依旧操作失败
+
+ HaSwitchTopic haSwitchTopic = new HaSwitchTopic(true);
+ for (HaASRelationDO relationDO: doList) {
+ if (!relationDO.getStatus().equals(HaStatusEnum.SWITCHING_CLOSEOUT_CODE)) {
+ // 当前不处于closeout状态
+ haSwitchTopic.setFinished(false);
+ continue;
+ }
+
+ needLog = true;
+
+ // 获取关联的KafkaUser
+ Set relatedKafkaUserSet = authorityService.getAuthorityByTopic(relationDO.getActiveClusterPhyId(), relationDO.getActiveResName())
+ .stream()
+ .map(elem -> elem.getAppId())
+ .filter(kafkaUser -> !activeHaKafkaUserSet.contains(kafkaUser))
+ .collect(Collectors.toSet());
+
+ for (String kafkaUser: relatedKafkaUserSet) {
+ // 操作新的主集群
+ Result rv = haKafkaUserService.activeHAInKafka(newActiveClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser);
+ if (rv.failed() && !focus) {
+ haSwitchTopic.setFinished(false);
+ this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
+ return haSwitchTopic;
+ } else if (rv.failed() && focus) {
+ allSuccess = false;
+ }
+
+ // 操作新的备集群,如果出现错误,则下次就不再进行操作ZK。新的备的Topic不是那么重要,因此这里允许出现跳过
+ rv = null;
+ if (!forceAndNewStandbyFailed) {
+ // 如果对备集群的操作过程中,出现了失败,则直接跳过
+ rv = haKafkaUserService.activeHAInKafka(newStandbyClusterPhyDO.getZookeeper(), newActiveClusterPhyDO.getId(), kafkaUser);
+ }
+
+ if (rv != null && rv.failed() && !focus) {
+ haSwitchTopic.setFinished(false);
+ this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
+ return haSwitchTopic;
+ } else if (rv != null && rv.failed() && focus) {
+ allSuccess = false;
+ forceAndNewStandbyFailed = true;
+ }
+
+ // 操作网关
+ rv = haKafkaUserService.activeHAInKafka(gatewayZK, newActiveClusterPhyDO.getId(), kafkaUser);
+ if (rv.failed() && !focus) {
+ haSwitchTopic.setFinished(false);
+ this.saveLogs(switchLogTemplate, String.format("%s:\t失败,1分钟后再进行重试", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName())));
+ return haSwitchTopic;
+ } else if (rv.failed() && focus) {
+ allSuccess = false;
+ }
+ }
+
+ // 记录已经激活的User
+ activeHaKafkaUserSet.addAll(relatedKafkaUserSet);
+
+ // 修改Topic主备信息
+ HaASRelationDO newHaASRelationDO = new HaASRelationDO(
+ newActiveClusterPhyDO.getId(), relationDO.getActiveResName(),
+ newStandbyClusterPhyDO.getId(), relationDO.getStandbyResName(),
+ HaResTypeEnum.TOPIC.getCode(),
+ HaStatusEnum.STABLE_CODE
+ );
+ newHaASRelationDO.setId(relationDO.getId());
+
+ haASRelationService.updateById(newHaASRelationDO);
+ }
+
+ if (!needLog) {
+ return haSwitchTopic;
+ }
+
+ this.saveLogs(switchLogTemplate, String.format("%s:\t%s", HaStatusEnum.SWITCHING_CLOSEOUT.getMsg(newActiveClusterPhyDO.getClusterName()), allSuccess? "成功": "存在失败,但进行强制执行,跳过该操作"));
+ return haSwitchTopic;
+ }
+
+ /**
+ * 检查参数,并获取主备关系信息
+ */
+ private Result