diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java index 4d45e7c2..00e58425 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java @@ -27,7 +27,6 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { @Autowired private ConnectorService connectorService; - @Override public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { Result> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId()); @@ -42,7 +41,7 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName); if (ksConnectorResult.failed()) { LOGGER.error( - "class=SyncConnectorTask||method=processClusterTask||connectClusterId={}||connectorName={}||result={}", + "method=processClusterTask||connectClusterId={}||connectorName={}||result={}", connectCluster.getId(), connectorName, ksConnectorResult ); @@ -53,6 +52,9 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { connectorList.add(ksConnectorResult.getData()); } + //mm2相关信息的添加 + connectorService.completeMirrorMakerInfo(connectCluster, connectorList); + connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData())); return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL;