[Feature]MM2管理-Connector元信息管理优化(#894)

This commit is contained in:
zengqiao
2023-02-09 16:57:26 +08:00
committed by EricZeng
parent e5c6d00438
commit fbcf58e19c

View File

@@ -27,7 +27,6 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask {
@Autowired @Autowired
private ConnectorService connectorService; private ConnectorService connectorService;
@Override @Override
public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) {
Result<List<String>> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId()); Result<List<String>> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId());
@@ -42,7 +41,7 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask {
Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName); Result<KSConnector> ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName);
if (ksConnectorResult.failed()) { if (ksConnectorResult.failed()) {
LOGGER.error( LOGGER.error(
"class=SyncConnectorTask||method=processClusterTask||connectClusterId={}||connectorName={}||result={}", "method=processClusterTask||connectClusterId={}||connectorName={}||result={}",
connectCluster.getId(), connectorName, ksConnectorResult connectCluster.getId(), connectorName, ksConnectorResult
); );
@@ -53,6 +52,9 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask {
connectorList.add(ksConnectorResult.getData()); connectorList.add(ksConnectorResult.getData());
} }
//mm2相关信息的添加
connectorService.completeMirrorMakerInfo(connectCluster, connectorList);
connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData())); connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData()));
return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL; return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL;