diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectCluster.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectCluster.java index a4c67bbc..43a6ce21 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectCluster.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/connect/ConnectCluster.java @@ -1,6 +1,7 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.connect; import com.xiaojukeji.know.streaming.km.common.bean.entity.EntityIdInterface; +import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import lombok.Data; import java.io.Serializable; @@ -54,6 +55,22 @@ public class ConnectCluster implements Serializable, Comparable, */ private String clusterUrl; + public String getSuitableRequestUrl() { + // 优先使用用户填写的url + String suitableRequestUrl = this.clusterUrl; + if (ValidateUtils.isBlank(suitableRequestUrl)) { + // 用户如果没有填写,则使用元信息中的url + suitableRequestUrl = this.memberLeaderUrl; + } + + //url去斜杠 + if (suitableRequestUrl.length() > 0 && suitableRequestUrl.charAt(suitableRequestUrl.length() - 1) == '/') { + return suitableRequestUrl.substring(0, suitableRequestUrl.length() - 1); + } + + return suitableRequestUrl; + } + @Override public int compareTo(ConnectCluster connectCluster) { return this.id.compareTo(connectCluster.getId()); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectClusterPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectClusterPO.java index f0a364e6..9175a6c1 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectClusterPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/connect/ConnectClusterPO.java @@ -29,7 +29,7 @@ public class ConnectClusterPO extends BasePO { private Integer state; /** - * 集群地址 + * 用户填写的集群地址 */ private String clusterUrl; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java index 030b78ad..86879662 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java @@ -40,12 +40,6 @@ public class ConnectClusterServiceImpl implements ConnectClusterService { @Override public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) { - //url去斜杠 - String clusterUrl = metadata.getMemberLeaderUrl(); - if (clusterUrl.charAt(clusterUrl.length() - 1) == '/') { - clusterUrl = clusterUrl.substring(0, clusterUrl.length() - 1); - } - ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName()); if (oldPO == null) { oldPO = new ConnectClusterPO(); @@ -54,7 +48,7 @@ public class ConnectClusterServiceImpl implements ConnectClusterService { oldPO.setName(metadata.getGroupName()); oldPO.setState(metadata.getState().getCode()); oldPO.setMemberLeaderUrl(metadata.getMemberLeaderUrl()); - oldPO.setClusterUrl(clusterUrl); + oldPO.setClusterUrl(""); oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION); connectClusterDAO.insert(oldPO); @@ -69,11 +63,11 @@ public class ConnectClusterServiceImpl implements ConnectClusterService { if (ValidateUtils.isBlank(oldPO.getVersion())) { oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION); } - if (!ValidateUtils.isBlank(clusterUrl)) { - oldPO.setClusterUrl(clusterUrl); + if (ValidateUtils.isNull(oldPO.getClusterUrl())) { + oldPO.setClusterUrl(""); } - connectClusterDAO.updateById(oldPO); + connectClusterDAO.updateById(oldPO); return oldPO.getId(); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java index c042276d..133355a8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/impl/ConnectorServiceImpl.java @@ -87,7 +87,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C props.put("config", configs); ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent( - connectCluster.getClusterUrl() + CREATE_CONNECTOR_URI, + connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI, props, ConnectorInfo.class ); @@ -127,7 +127,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } List nameList = restTool.getArrayObjectWithJsonContent( - connectCluster.getClusterUrl() + LIST_CONNECTORS_URI, + connectCluster.getSuitableRequestUrl() + LIST_CONNECTORS_URI, new HashMap<>(), String.class ); @@ -224,7 +224,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } restTool.putJsonForObject( - connectCluster.getClusterUrl() + String.format(RESUME_CONNECTOR_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName), new HashMap<>(), String.class ); @@ -259,7 +259,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } restTool.postObjectWithJsonContent( - connectCluster.getClusterUrl() + String.format(RESTART_CONNECTOR_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName), new HashMap<>(), String.class ); @@ -294,7 +294,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } restTool.putJsonForObject( - connectCluster.getClusterUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName), new HashMap<>(), String.class ); @@ -329,7 +329,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } restTool.deleteWithParamsAndHeader( - connectCluster.getClusterUrl() + String.format(DELETE_CONNECTOR_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName), new HashMap<>(), new HashMap<>(), String.class @@ -365,7 +365,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C } ConnectorInfo connectorInfo = restTool.putJsonForObject( - connectCluster.getClusterUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName), configs, org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo.class ); @@ -532,7 +532,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C private Result getConnectorInfoFromCluster(ConnectCluster connectCluster, String connectorName) { try { ConnectorInfo connectorInfo = restTool.getForObject( - connectCluster.getClusterUrl() + GET_CONNECTOR_INFO_PREFIX_URI + "/" + connectorName, + connectCluster.getSuitableRequestUrl() + GET_CONNECTOR_INFO_PREFIX_URI + "/" + connectorName, new HashMap<>(), ConnectorInfo.class ); @@ -558,7 +558,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C private Result> getConnectorTopicsFromCluster(ConnectCluster connectCluster, String connectorName) { try { Properties properties = restTool.getForObject( - connectCluster.getClusterUrl() + String.format(GET_CONNECTOR_TOPICS_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_TOPICS_URI, connectorName), new HashMap<>(), Properties.class ); @@ -578,7 +578,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C private Result getConnectorStateInfoFromCluster(ConnectCluster connectCluster, String connectorName) { try { KSConnectorStateInfo connectorStateInfo = restTool.getForObject( - connectCluster.getClusterUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName), + connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName), new HashMap<>(), KSConnectorStateInfo.class ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/plugin/impl/PluginServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/plugin/impl/PluginServiceImpl.java index fa6f1394..8ef4d391 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/plugin/impl/PluginServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/plugin/impl/PluginServiceImpl.java @@ -66,7 +66,7 @@ public class PluginServiceImpl extends BaseVersionControlService implements Plug // 通过参数检查接口,获取插件配置 ConfigInfos configInfos = restTool.putJsonForObject( - connectCluster.getClusterUrl() + String.format(GET_PLUGIN_CONFIG_DESC_URI, props.getProperty(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME)), + connectCluster.getSuitableRequestUrl() + String.format(GET_PLUGIN_CONFIG_DESC_URI, props.getProperty(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME)), props, ConfigInfos.class ); @@ -94,7 +94,7 @@ public class PluginServiceImpl extends BaseVersionControlService implements Plug // 通过参数检查接口,获取插件配置 List pluginList = restTool.getArrayObjectWithJsonContent( - connectCluster.getClusterUrl() + GET_ALL_PLUGINS_URI, + connectCluster.getSuitableRequestUrl() + GET_ALL_PLUGINS_URI, new HashMap<>(), ConnectPluginBasic.class ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/worker/impl/WorkerConnectorServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/worker/impl/WorkerConnectorServiceImpl.java index 99fb9ba2..eb2c80fc 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/worker/impl/WorkerConnectorServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/worker/impl/WorkerConnectorServiceImpl.java @@ -105,7 +105,7 @@ public class WorkerConnectorServiceImpl implements WorkerConnectorService { return Result.buildFailure(ResultStatus.NOT_EXIST); } - String url = String.format(RESTART_TASK_URI, connectCluster.getClusterUrl(), dto.getConnectorName(), dto.getTaskId()); + String url = String.format(RESTART_TASK_URI, connectCluster.getSuitableRequestUrl(), dto.getConnectorName(), dto.getTaskId()); try { restTool.postObjectWithJsonContent(url, null, String.class); } catch (Exception e) { diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java index cb886eea..646bf6c0 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncConnectClusterAndWorkerTask.java @@ -20,7 +20,6 @@ import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum; import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; -import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache; @@ -47,9 +46,6 @@ public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispat @Autowired private WorkerService workerService; - @Autowired - private WorkerConnectorService workerConnectorService; - @Autowired private ConnectClusterService connectClusterService; @@ -60,7 +56,6 @@ public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispat //获取connect集群 List groupList = groupService.listClusterGroups(clusterPhy.getId()).stream().filter(elem->elem.getType()==GroupTypeEnum.CONNECT_CLUSTER).collect(Collectors.toList()); for (Group group: groupList) { - try { KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, group.getName()); if (!ksGroupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {