[Optimize]去除对Connect集群的clusterUrl的动态更新 (#1079)

问题:
clusterUrl动态更新可能会获取到错误的地址,导致请求connect集群相关信息失败;

解决:
去除动态更新,仅支持用户输入;

遗留:
前端需要支持用户输入;
This commit is contained in:
EricZeng
2023-07-05 11:55:16 +08:00
committed by GitHub
parent abaadfb9a8
commit 0cd071c5c6
7 changed files with 35 additions and 29 deletions

View File

@@ -1,6 +1,7 @@
package com.xiaojukeji.know.streaming.km.common.bean.entity.connect; package com.xiaojukeji.know.streaming.km.common.bean.entity.connect;
import com.xiaojukeji.know.streaming.km.common.bean.entity.EntityIdInterface; import com.xiaojukeji.know.streaming.km.common.bean.entity.EntityIdInterface;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import lombok.Data; import lombok.Data;
import java.io.Serializable; import java.io.Serializable;
@@ -54,6 +55,22 @@ public class ConnectCluster implements Serializable, Comparable<ConnectCluster>,
*/ */
private String clusterUrl; private String clusterUrl;
public String getSuitableRequestUrl() {
// 优先使用用户填写的url
String suitableRequestUrl = this.clusterUrl;
if (ValidateUtils.isBlank(suitableRequestUrl)) {
// 用户如果没有填写则使用元信息中的url
suitableRequestUrl = this.memberLeaderUrl;
}
//url去斜杠
if (suitableRequestUrl.length() > 0 && suitableRequestUrl.charAt(suitableRequestUrl.length() - 1) == '/') {
return suitableRequestUrl.substring(0, suitableRequestUrl.length() - 1);
}
return suitableRequestUrl;
}
@Override @Override
public int compareTo(ConnectCluster connectCluster) { public int compareTo(ConnectCluster connectCluster) {
return this.id.compareTo(connectCluster.getId()); return this.id.compareTo(connectCluster.getId());

View File

@@ -29,7 +29,7 @@ public class ConnectClusterPO extends BasePO {
private Integer state; private Integer state;
/** /**
* 集群地址 * 用户填写的集群地址
*/ */
private String clusterUrl; private String clusterUrl;

View File

@@ -40,12 +40,6 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
@Override @Override
public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) { public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
//url去斜杠
String clusterUrl = metadata.getMemberLeaderUrl();
if (clusterUrl.charAt(clusterUrl.length() - 1) == '/') {
clusterUrl = clusterUrl.substring(0, clusterUrl.length() - 1);
}
ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName()); ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName());
if (oldPO == null) { if (oldPO == null) {
oldPO = new ConnectClusterPO(); oldPO = new ConnectClusterPO();
@@ -54,7 +48,7 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
oldPO.setName(metadata.getGroupName()); oldPO.setName(metadata.getGroupName());
oldPO.setState(metadata.getState().getCode()); oldPO.setState(metadata.getState().getCode());
oldPO.setMemberLeaderUrl(metadata.getMemberLeaderUrl()); oldPO.setMemberLeaderUrl(metadata.getMemberLeaderUrl());
oldPO.setClusterUrl(clusterUrl); oldPO.setClusterUrl("");
oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION); oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION);
connectClusterDAO.insert(oldPO); connectClusterDAO.insert(oldPO);
@@ -69,11 +63,11 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
if (ValidateUtils.isBlank(oldPO.getVersion())) { if (ValidateUtils.isBlank(oldPO.getVersion())) {
oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION); oldPO.setVersion(KafkaConstant.DEFAULT_CONNECT_VERSION);
} }
if (!ValidateUtils.isBlank(clusterUrl)) { if (ValidateUtils.isNull(oldPO.getClusterUrl())) {
oldPO.setClusterUrl(clusterUrl); oldPO.setClusterUrl("");
} }
connectClusterDAO.updateById(oldPO);
connectClusterDAO.updateById(oldPO);
return oldPO.getId(); return oldPO.getId();
} }

View File

@@ -87,7 +87,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
props.put("config", configs); props.put("config", configs);
ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent( ConnectorInfo connectorInfo = restTool.postObjectWithJsonContent(
connectCluster.getClusterUrl() + CREATE_CONNECTOR_URI, connectCluster.getSuitableRequestUrl() + CREATE_CONNECTOR_URI,
props, props,
ConnectorInfo.class ConnectorInfo.class
); );
@@ -127,7 +127,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
} }
List<String> nameList = restTool.getArrayObjectWithJsonContent( List<String> nameList = restTool.getArrayObjectWithJsonContent(
connectCluster.getClusterUrl() + LIST_CONNECTORS_URI, connectCluster.getSuitableRequestUrl() + LIST_CONNECTORS_URI,
new HashMap<>(), new HashMap<>(),
String.class String.class
); );
@@ -224,7 +224,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
} }
restTool.putJsonForObject( restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(RESUME_CONNECTOR_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(RESUME_CONNECTOR_URI, connectorName),
new HashMap<>(), new HashMap<>(),
String.class String.class
); );
@@ -259,7 +259,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
} }
restTool.postObjectWithJsonContent( restTool.postObjectWithJsonContent(
connectCluster.getClusterUrl() + String.format(RESTART_CONNECTOR_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(RESTART_CONNECTOR_URI, connectorName),
new HashMap<>(), new HashMap<>(),
String.class String.class
); );
@@ -294,7 +294,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
} }
restTool.putJsonForObject( restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(PAUSE_CONNECTOR_URI, connectorName),
new HashMap<>(), new HashMap<>(),
String.class String.class
); );
@@ -329,7 +329,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
} }
restTool.deleteWithParamsAndHeader( restTool.deleteWithParamsAndHeader(
connectCluster.getClusterUrl() + String.format(DELETE_CONNECTOR_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(DELETE_CONNECTOR_URI, connectorName),
new HashMap<>(), new HashMap<>(),
new HashMap<>(), new HashMap<>(),
String.class String.class
@@ -365,7 +365,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
} }
ConnectorInfo connectorInfo = restTool.putJsonForObject( ConnectorInfo connectorInfo = restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(UPDATE_CONNECTOR_CONFIG_URI, connectorName),
configs, configs,
org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo.class org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo.class
); );
@@ -532,7 +532,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
private Result<KSConnectorInfo> getConnectorInfoFromCluster(ConnectCluster connectCluster, String connectorName) { private Result<KSConnectorInfo> getConnectorInfoFromCluster(ConnectCluster connectCluster, String connectorName) {
try { try {
ConnectorInfo connectorInfo = restTool.getForObject( ConnectorInfo connectorInfo = restTool.getForObject(
connectCluster.getClusterUrl() + GET_CONNECTOR_INFO_PREFIX_URI + "/" + connectorName, connectCluster.getSuitableRequestUrl() + GET_CONNECTOR_INFO_PREFIX_URI + "/" + connectorName,
new HashMap<>(), new HashMap<>(),
ConnectorInfo.class ConnectorInfo.class
); );
@@ -558,7 +558,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
private Result<List<String>> getConnectorTopicsFromCluster(ConnectCluster connectCluster, String connectorName) { private Result<List<String>> getConnectorTopicsFromCluster(ConnectCluster connectCluster, String connectorName) {
try { try {
Properties properties = restTool.getForObject( Properties properties = restTool.getForObject(
connectCluster.getClusterUrl() + String.format(GET_CONNECTOR_TOPICS_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_TOPICS_URI, connectorName),
new HashMap<>(), new HashMap<>(),
Properties.class Properties.class
); );
@@ -578,7 +578,7 @@ public class ConnectorServiceImpl extends BaseVersionControlService implements C
private Result<KSConnectorStateInfo> getConnectorStateInfoFromCluster(ConnectCluster connectCluster, String connectorName) { private Result<KSConnectorStateInfo> getConnectorStateInfoFromCluster(ConnectCluster connectCluster, String connectorName) {
try { try {
KSConnectorStateInfo connectorStateInfo = restTool.getForObject( KSConnectorStateInfo connectorStateInfo = restTool.getForObject(
connectCluster.getClusterUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName), connectCluster.getSuitableRequestUrl() + String.format(GET_CONNECTOR_STATUS_URI, connectorName),
new HashMap<>(), new HashMap<>(),
KSConnectorStateInfo.class KSConnectorStateInfo.class
); );

View File

@@ -66,7 +66,7 @@ public class PluginServiceImpl extends BaseVersionControlService implements Plug
// 通过参数检查接口,获取插件配置 // 通过参数检查接口,获取插件配置
ConfigInfos configInfos = restTool.putJsonForObject( ConfigInfos configInfos = restTool.putJsonForObject(
connectCluster.getClusterUrl() + String.format(GET_PLUGIN_CONFIG_DESC_URI, props.getProperty(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME)), connectCluster.getSuitableRequestUrl() + String.format(GET_PLUGIN_CONFIG_DESC_URI, props.getProperty(KafkaConnectConstant.CONNECTOR_CLASS_FILED_NAME)),
props, props,
ConfigInfos.class ConfigInfos.class
); );
@@ -94,7 +94,7 @@ public class PluginServiceImpl extends BaseVersionControlService implements Plug
// 通过参数检查接口,获取插件配置 // 通过参数检查接口,获取插件配置
List<ConnectPluginBasic> pluginList = restTool.getArrayObjectWithJsonContent( List<ConnectPluginBasic> pluginList = restTool.getArrayObjectWithJsonContent(
connectCluster.getClusterUrl() + GET_ALL_PLUGINS_URI, connectCluster.getSuitableRequestUrl() + GET_ALL_PLUGINS_URI,
new HashMap<>(), new HashMap<>(),
ConnectPluginBasic.class ConnectPluginBasic.class
); );

View File

@@ -105,7 +105,7 @@ public class WorkerConnectorServiceImpl implements WorkerConnectorService {
return Result.buildFailure(ResultStatus.NOT_EXIST); return Result.buildFailure(ResultStatus.NOT_EXIST);
} }
String url = String.format(RESTART_TASK_URI, connectCluster.getClusterUrl(), dto.getConnectorName(), dto.getTaskId()); String url = String.format(RESTART_TASK_URI, connectCluster.getSuitableRequestUrl(), dto.getConnectorName(), dto.getTaskId());
try { try {
restTool.postObjectWithJsonContent(url, null, String.class); restTool.postObjectWithJsonContent(url, null, String.class);
} catch (Exception e) { } catch (Exception e) {

View File

@@ -20,7 +20,6 @@ import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum;
import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum; import com.xiaojukeji.know.streaming.km.common.enums.jmx.JmxEnum;
import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService; import com.xiaojukeji.know.streaming.km.core.service.connect.cluster.ConnectClusterService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerConnectorService;
import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService; import com.xiaojukeji.know.streaming.km.core.service.connect.worker.WorkerService;
import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService;
import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache; import com.xiaojukeji.know.streaming.km.persistence.connect.cache.LoadedConnectClusterCache;
@@ -47,9 +46,6 @@ public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispat
@Autowired @Autowired
private WorkerService workerService; private WorkerService workerService;
@Autowired
private WorkerConnectorService workerConnectorService;
@Autowired @Autowired
private ConnectClusterService connectClusterService; private ConnectClusterService connectClusterService;
@@ -60,7 +56,6 @@ public class SyncConnectClusterAndWorkerTask extends AbstractAsyncMetadataDispat
//获取connect集群 //获取connect集群
List<Group> groupList = groupService.listClusterGroups(clusterPhy.getId()).stream().filter(elem->elem.getType()==GroupTypeEnum.CONNECT_CLUSTER).collect(Collectors.toList()); List<Group> groupList = groupService.listClusterGroups(clusterPhy.getId()).stream().filter(elem->elem.getType()==GroupTypeEnum.CONNECT_CLUSTER).collect(Collectors.toList());
for (Group group: groupList) { for (Group group: groupList) {
try { try {
KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, group.getName()); KSGroupDescription ksGroupDescription = groupService.getGroupDescriptionFromKafka(clusterPhy, group.getName());
if (!ksGroupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) { if (!ksGroupDescription.protocolType().equals(CONNECT_CLUSTER_PROTOCOL_TYPE)) {