[Bugfix]修复对ZK客户端进行配置后不生效的问题(#694)

1、修复在ks_km_physical_cluster表的zk_properties字段填写ZK 客户端的相关配置后,不生效的问题。
2、删除zk_properties字段中,暂时无需使用的jmxConfig字段。
This commit is contained in:
zengqiao
2023-01-06 13:43:48 +08:00
committed by EricZeng
parent 17e0c39f83
commit 6b3eb05735
2 changed files with 46 additions and 34 deletions

View File

@@ -13,9 +13,6 @@ import java.util.Properties;
*/
@ApiModel(description = "ZK配置")
public class ZKConfig implements Serializable {
@ApiModelProperty(value="ZK的jmx配置")
private JmxConfig jmxConfig;
@ApiModelProperty(value="ZK是否开启secure", example = "false")
private Boolean openSecure = false;
@@ -28,14 +25,6 @@ public class ZKConfig implements Serializable {
@ApiModelProperty(value="ZK的Request超时时间")
private Properties otherProps = new Properties();
public JmxConfig getJmxConfig() {
return jmxConfig == null? new JmxConfig(): jmxConfig;
}
public void setJmxConfig(JmxConfig jmxConfig) {
this.jmxConfig = jmxConfig;
}
public Boolean getOpenSecure() {
return openSecure != null && openSecure;
}
@@ -53,7 +42,7 @@ public class ZKConfig implements Serializable {
}
public Integer getRequestTimeoutUnitMs() {
return requestTimeoutUnitMs == null? Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS: requestTimeoutUnitMs;
return requestTimeoutUnitMs == null? Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS: requestTimeoutUnitMs;
}
public void setRequestTimeoutUnitMs(Integer requestTimeoutUnitMs) {

View File

@@ -3,9 +3,10 @@ package com.xiaojukeji.know.streaming.km.persistence.kafka;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.constant.Constant;
import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.persistence.AbstractClusterLoadedChangedHandler;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
@@ -17,11 +18,12 @@ import org.springframework.stereotype.Component;
import scala.Option;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler implements KafkaClient<KafkaZkClient> {
private static final ILog log = LogFactory.getLog(KafkaAdminZKClient.class);
private static final ILog LOGGER = LogFactory.getLog(KafkaAdminZKClient.class);
/**
* Kafka提供的KafkaZkClient
@@ -92,13 +94,13 @@ public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler impl
return;
}
log.info("close ZK Client starting, clusterPhyId:{}", clusterPhyId);
LOGGER.info("method=closeZKClient||clusterPhyId={}||msg=close ZK Client starting", clusterPhyId);
kafkaZkClient.close();
log.info("close ZK Client success, clusterPhyId:{}", clusterPhyId);
LOGGER.info("method=closeZKClient||clusterPhyId={}||msg=close ZK Client success", clusterPhyId);
} catch (Exception e) {
log.error("close ZK Client failed, clusterPhyId:{}", clusterPhyId, e);
LOGGER.error("method=closeZKClient||clusterPhyId={}||msg=close ZK Client failed||errMsg=exception!", clusterPhyId, e);
} finally {
modifyClientMapLock.unlock();
}
@@ -107,19 +109,19 @@ public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler impl
private KafkaZkClient createZKClient(Long clusterPhyId) throws NotExistException {
ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId);
if (clusterPhy == null) {
log.warn("create ZK Client failed, cluster not exist, clusterPhyId:{}", clusterPhyId);
LOGGER.warn("method=createZKClient||clusterPhyId={}||msg=create ZK Client failed, cluster not exist", clusterPhyId);
throw new NotExistException(MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
if (ValidateUtils.isBlank(clusterPhy.getZookeeper())) {
log.warn("create ZK Client failed, zookeeper not exist, clusterPhyId:{}", clusterPhyId);
LOGGER.warn("method=createZKClient||clusterPhyId={}||msg=create ZK Client failed, zookeeper not exist", clusterPhyId);
return null;
}
return this.createZKClient(clusterPhyId, clusterPhy.getZookeeper());
return this.createZKClient(clusterPhyId, clusterPhy);
}
private KafkaZkClient createZKClient(Long clusterPhyId, String zookeeperAddress) {
private KafkaZkClient createZKClient(Long clusterPhyId, ClusterPhy clusterPhy) {
try {
modifyClientMapLock.lock();
@@ -128,33 +130,54 @@ public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler impl
return kafkaZkClient;
}
log.debug("create ZK Client starting, clusterPhyId:{} zookeeperAddress:{}", clusterPhyId, zookeeperAddress);
ZKConfig zkConfig = this.getZKConfig(clusterPhy);
LOGGER.debug("method=createZKClient||clusterPhyId={}||clusterPhy={}||msg=create ZK Client starting", clusterPhyId, clusterPhy);
kafkaZkClient = KafkaZkClient.apply(
zookeeperAddress,
false,
// 添加支持zk的Kerberos认证
// true,
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
clusterPhy.getZookeeper(),
zkConfig.getOpenSecure(),
zkConfig.getSessionTimeoutUnitMs(),
zkConfig.getRequestTimeoutUnitMs(),
5,
Time.SYSTEM,
"KnowStreaming-clusterPhyId-" + clusterPhyId,
"SessionExpireListener",
Option.apply("KnowStreaming-clusterPhyId-" + clusterPhyId),
Option.apply(new ZKClientConfig())
"KS-ZK-ClusterPhyId-" + clusterPhyId,
"KS-ZK-SessionExpireListener-clusterPhyId-" + clusterPhyId,
Option.apply("KS-ZK-ClusterPhyId-" + clusterPhyId),
Option.apply(this.getZKConfig(clusterPhyId, zkConfig.getOtherProps()))
);
KAFKA_ZK_CLIENT_MAP.put(clusterPhyId, kafkaZkClient);
KAFKA_ZK_CLIENT_CREATE_TIME.put(clusterPhyId, System.currentTimeMillis());
log.info("create ZK Client success, clusterPhyId:{}", clusterPhyId);
LOGGER.info("method=createZKClient||clusterPhyId={}||msg=create ZK Client success", clusterPhyId);
} catch (Exception e) {
log.error("create ZK Client failed, clusterPhyId:{} zookeeperAddress:{}", clusterPhyId, zookeeperAddress, e);
LOGGER.error("method=createZKClient||clusterPhyId={}||clusterPhy={}||msg=create ZK Client failed||errMsg=exception", clusterPhyId, clusterPhy, e);
} finally {
modifyClientMapLock.unlock();
}
return KAFKA_ZK_CLIENT_MAP.get(clusterPhyId);
}
private ZKConfig getZKConfig(ClusterPhy clusterPhy) {
ZKConfig zkConfig = ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class);
if (zkConfig == null) {
return new ZKConfig();
}
return zkConfig;
}
private ZKClientConfig getZKConfig(Long clusterPhyId, Properties props) {
ZKClientConfig zkClientConfig = new ZKClientConfig();
try {
props.entrySet().forEach(elem -> zkClientConfig.setProperty((String) elem.getKey(), (String) elem.getValue()));
} catch (Exception e) {
LOGGER.error("method=getZKConfig||clusterPhyId={}||props={}||errMsg=exception", clusterPhyId, props);
}
return zkClientConfig;
}
}