[Optimize] 新增/编辑MM2 Topic 由当前集群获取改为对应的sourceKafka集群获取& 新增/编辑MM2入参优化(#894)

This commit is contained in:
erge
2023-02-20 16:31:35 +08:00
committed by lucasun
parent 5c26e8947b
commit ae8cc3092b
5 changed files with 93 additions and 60 deletions

View File

@@ -1,5 +1,5 @@
import React, { createContext, createElement, forwardRef, useContext, useEffect, useImperativeHandle, useRef, useState } from 'react';
import { Alert, Button, Drawer, Form, Input, InputNumber, Radio, Select, Steps, Switch, Table, Transfer, Utils } from 'knowdesign';
import { Alert, Button, Drawer, Form, Input, InputNumber, Radio, Select, Spin, Steps, Switch, Table, Transfer, Utils } from 'knowdesign';
import { FormInstance } from 'knowdesign/es/basic/form/Form';
import message from '@src/components/Message';
import api from '@src/api';
@@ -136,6 +136,8 @@ const StepFormFirst = (props: SubFormProps) => {
}>();
const [form] = useStepForm(0);
const [topicData, setTopicData] = useState([]);
const [topicDataLoading, setTopicDataLoading] = useState(false);
const [givenSourceKafkaId, setGivenSourceKafkaId] = useState(clusterId);
const { type, detail, setSourceKafkaClusterId, setBootstrapServers, setSourceDetailConfigs, setCheckoutPointDetailConfigs } =
useContext(StepsFormContent);
const isEdit = type === 'edit';
@@ -178,21 +180,26 @@ const StepFormFirst = (props: SubFormProps) => {
};
// 获取Topic列表
const getTopicList = () => {
const getTopicList = (givenSourceKafkaId: any) => {
// ! 需整理
Utils.request(api.getTopicMetaList(Number(clusterId))).then((res: any) => {
const dataDe = res || [];
const dataHandle = dataDe.map((item: any) => {
return {
...item,
key: item.topicName,
label: item.topicName,
value: item.topicName,
title: item.topicName,
};
setTopicDataLoading(true);
Utils.request(api.getTopicMetaList(Number(givenSourceKafkaId)))
.then((res: any) => {
const dataDe = res || [];
const dataHandle = dataDe.map((item: any) => {
return {
...item,
key: item.topicName,
label: item.topicName,
value: item.topicName,
title: item.topicName,
};
});
setTopicData(dataHandle);
})
.finally(() => {
setTopicDataLoading(false);
});
setTopicData(dataHandle);
});
};
const getMM2Config = (connectClusterId: string | number) => {
@@ -207,7 +214,6 @@ const StepFormFirst = (props: SubFormProps) => {
const detailConfigs: any[] = isEdit && res.length > 1 ? res?.[2] : [];
let sourceConfigs: any;
let checkpointConfigs: any;
detailConfigs?.forEach((config) => {
if (config['connector.class'] === 'org.apache.kafka.connect.mirror.MirrorCheckpointConnector') {
checkpointConfigs = config;
@@ -215,6 +221,7 @@ const StepFormFirst = (props: SubFormProps) => {
} else if (config['connector.class'] === 'org.apache.kafka.connect.mirror.MirrorSourceConnector') {
sourceConfigs = config;
setSourceDetailConfigs(config);
setGivenSourceKafkaId(sourceConfigs['source.cluster.alias']);
}
});
const formItemValue: any = {};
@@ -222,36 +229,47 @@ const StepFormFirst = (props: SubFormProps) => {
res?.[0].configs.forEach(({ definition }: any) => {
if (existConfigItems.sourceConfigs.includes(definition.name)) {
if (isEdit && sourceConfigs[definition.name]) {
formItemValue[definition.name] = sourceConfigs[definition.name];
if (definition.name === 'topics') {
sourceConfigs[definition.name] !== '.*'
? formItemValues.push({
formItemValue[definition.name] = topicTargetKeys || sourceConfigs[definition.name];
if (sourceConfigs[definition.name] === '.*' && topicTargetKeys.length < 1) {
formItemValues.push({
name: 'priority',
value: 'allTopic',
});
} else {
formItemValues.push(
{
name: 'topics',
value: sourceConfigs[definition.name].split(',') || null,
})
: formItemValues.push({
name: 'priority',
value: 'allTopic',
});
value: topicTargetKeys.length > 0 ? topicTargetKeys : sourceConfigs[definition.name].split(','),
},
{ name: 'priority', value: 'givenTopic' }
);
topicTargetKeys.length < 1 && setTopicTargetKeys(sourceConfigs[definition.name].split(','));
}
} else {
formItemValue[definition.name] = sourceConfigs[definition.name];
formItemValues.push({
name: definition.name,
value: sourceConfigs[definition.name] || null,
});
}
} else {
formItemValue[definition.name] = definition.defaultValue;
if (definition.name === 'topics') {
definition.defaultValue !== '.*'
formItemValue['topics'] = topicTargetKeys || definition.defaultValue.split(',');
definition.defaultValue === '.*' && topicTargetKeys.length < 1
? formItemValues.push({
name: 'topics',
value: definition.defaultValue.split(',') || null,
})
: formItemValues.push({
name: 'priority',
value: 'allTopic',
});
})
: formItemValues.push(
{
name: 'topics',
value: topicTargetKeys || definition.defaultValue,
},
{ name: 'priority', value: 'givenTopic' }
);
} else {
formItemValue[definition.name] = definition.defaultValue;
formItemValues.push({
name: definition.name,
value: definition.defaultValue || null,
@@ -290,11 +308,14 @@ const StepFormFirst = (props: SubFormProps) => {
};
useEffect(() => {
getTopicList();
getConnectClustersList();
getSourceKafkaClustersList();
}, []);
useEffect(() => {
getTopicList(givenSourceKafkaId);
}, [givenSourceKafkaId]);
useEffect(() => {
form.resetFields(existConfigItems.sourceConfigs);
form.setFields(formItemValues);
@@ -306,6 +327,9 @@ const StepFormFirst = (props: SubFormProps) => {
setBootstrapServers(bootstrapServers);
}, [sourcekafkaClustersId]);
useEffect(() => {
form.setFieldsValue({ topics: topicTargetKeys });
}, [topicTargetKeys]);
// useEffect(() => {
// connectorConfig &&
// form.setFieldsValue({
@@ -332,7 +356,6 @@ const StepFormFirst = (props: SubFormProps) => {
'emit.checkpoints.interval.seconds': 60,
'checkpoints.topic.replication.factor': false,
'replication.policy.separator': '.',
priority: 'allTopic',
};
form.setFieldsValue(config);
setFormItemValue((state: any) => {
@@ -366,9 +389,7 @@ const StepFormFirst = (props: SubFormProps) => {
return Promise.reject('MM2任务名称长度限制在164字符');
}
if (!new RegExp(regClusterName).test(value)) {
return Promise.reject(
'MM2 名称支持中英文、数字、特殊字符 ! " # $ % & \' ( ) * + , - . / : ; < = > ? @ [ ] ^ _ ` { | } ~'
);
return Promise.reject("MM2 名称支持中英文、数字、特殊字符 ! # $ % & ' ( ) * + , - . / : ; < = > ? @ [ ] ^ _ ` { | } ~");
}
return Promise.resolve();
// return Utils.request(api.isConnectorExist(prevForm.getFieldValue('connectClusterId'), value)).then(
@@ -398,6 +419,7 @@ const StepFormFirst = (props: SubFormProps) => {
onChange={(e, option: any) => {
setSourcekafkaClustersId(e);
setSourceKafkaClusterId(option?.value);
setGivenSourceKafkaId(option?.value);
}}
/>
</Form.Item>
@@ -421,22 +443,24 @@ const StepFormFirst = (props: SubFormProps) => {
},
]}
>
<Transfer
dataSource={topicData}
titles={['待选Topic', '已选Topic']}
customHeader
showSelectedCount
locale={{
itemUnit: '',
itemsUnit: '',
}}
showSearch
filterOption={(inputValue, option) => option.topicName.indexOf(inputValue) > -1}
targetKeys={topicTargetKeys}
onChange={topicChange}
render={(item) => item.title}
suffix={<IconFont type="icon-fangdajing" />}
/>
<Spin spinning={topicDataLoading}>
<Transfer
dataSource={topicData}
titles={['待选Topic', '已选Topic']}
customHeader
showSelectedCount
locale={{
itemUnit: '',
itemsUnit: '',
}}
showSearch
filterOption={(inputValue, option) => option.topicName.indexOf(inputValue) > -1}
targetKeys={topicTargetKeys}
onChange={topicChange}
render={(item) => item.title}
suffix={<IconFont type="icon-fangdajing" />}
/>
</Spin>
</Form.Item>
) : null;
}}
@@ -1006,7 +1030,7 @@ export default forwardRef(
'emit.checkpoints.enabled': result['emit.checkpoints.enabled'],
'emit.checkpoints.interval.seconds': result['emit.checkpoints.interval.seconds'],
'checkpoints.topic.replication.factor': result['checkpoints.topic.replication.factor'],
'source.cluster.alias': sourceKafkaClusterId,
'source.cluster.alias': sourceKafkaClusterId || res[0].sourceKafkaClusterId,
name: detail?.checkpointConnector || result.name,
'source.cluster.bootstrap.servers': bootstrapServers || checkoutPointDetailConfigs?.['source.cluster.bootstrap.servers'],
};
@@ -1015,7 +1039,7 @@ export default forwardRef(
'connector.class': 'org.apache.kafka.connect.mirror.MirrorHeartbeatConnector',
'heartbeats.topic.replication.factor': result['heartbeats.topic.replication.factor'],
'emit.heartbeats.interval.seconds': result['emit.heartbeats.interval.seconds'],
'source.cluster.alias': sourceKafkaClusterId,
'source.cluster.alias': sourceKafkaClusterId || res[0].sourceKafkaClusterId,
name: detail?.heartbeatConnector || result.name,
'source.cluster.bootstrap.servers': bootstrapServers || heartbeatDetailConfigs?.['source.cluster.bootstrap.servers'],
};
@@ -1036,7 +1060,7 @@ export default forwardRef(
'replication.policy.class': result['replication.policy.class'],
'replication.policy.separator': result['replication.policy.separator'],
topics: result['priority'] === 'givenTopic' ? result['topics'].join() : '.*',
'source.cluster.alias': sourceKafkaClusterId,
'source.cluster.alias': sourceKafkaClusterId || res[0].sourceKafkaClusterId,
name: result.name,
'source.cluster.bootstrap.servers': bootstrapServers || sourceDetailConfigs?.['source.cluster.bootstrap.servers'],
};