From fbcf58e19cb27ff2961bdbcf7954af9b727fdcb2 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 9 Feb 2023 16:57:26 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]MM2=E7=AE=A1=E7=90=86-Connector?= =?UTF-8?q?=E5=85=83=E4=BF=A1=E6=81=AF=E7=AE=A1=E7=90=86=E4=BC=98=E5=8C=96?= =?UTF-8?q?(#894)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/task/connect/metadata/SyncConnectorTask.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java index 4d45e7c2..00e58425 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/connect/metadata/SyncConnectorTask.java @@ -27,7 +27,6 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { @Autowired private ConnectorService connectorService; - @Override public TaskResult processClusterTask(ConnectCluster connectCluster, long triggerTimeUnitMs) { Result> nameListResult = connectorService.listConnectorsFromCluster(connectCluster.getId()); @@ -42,7 +41,7 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { Result ksConnectorResult = connectorService.getAllConnectorInfoFromCluster(connectCluster.getId(), connectorName); if (ksConnectorResult.failed()) { LOGGER.error( - "class=SyncConnectorTask||method=processClusterTask||connectClusterId={}||connectorName={}||result={}", + "method=processClusterTask||connectClusterId={}||connectorName={}||result={}", connectCluster.getId(), connectorName, ksConnectorResult ); @@ -53,6 +52,9 @@ public class SyncConnectorTask extends AbstractAsyncMetadataDispatchTask { connectorList.add(ksConnectorResult.getData()); } + //mm2相关信息的添加 + connectorService.completeMirrorMakerInfo(connectCluster, connectorList); + connectorService.batchReplace(connectCluster.getKafkaClusterPhyId(), connectCluster.getId(), connectorList, new HashSet<>(nameListResult.getData())); return allSuccess? TaskResult.SUCCESS: TaskResult.FAIL;