diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java index 66a727e5..f0fe41d0 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/config/ZKConfig.java @@ -13,9 +13,6 @@ import java.util.Properties; */ @ApiModel(description = "ZK配置") public class ZKConfig implements Serializable { - @ApiModelProperty(value="ZK的jmx配置") - private JmxConfig jmxConfig; - @ApiModelProperty(value="ZK是否开启secure", example = "false") private Boolean openSecure = false; @@ -28,14 +25,6 @@ public class ZKConfig implements Serializable { @ApiModelProperty(value="ZK的Request超时时间") private Properties otherProps = new Properties(); - public JmxConfig getJmxConfig() { - return jmxConfig == null? new JmxConfig(): jmxConfig; - } - - public void setJmxConfig(JmxConfig jmxConfig) { - this.jmxConfig = jmxConfig; - } - public Boolean getOpenSecure() { return openSecure != null && openSecure; } @@ -53,7 +42,7 @@ public class ZKConfig implements Serializable { } public Integer getRequestTimeoutUnitMs() { - return requestTimeoutUnitMs == null? Constant.DEFAULT_REQUEST_TIMEOUT_UNIT_MS: requestTimeoutUnitMs; + return requestTimeoutUnitMs == null? Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS: requestTimeoutUnitMs; } public void setRequestTimeoutUnitMs(Integer requestTimeoutUnitMs) { diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminZKClient.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminZKClient.java index e6275a60..48b26eb2 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminZKClient.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/KafkaAdminZKClient.java @@ -3,9 +3,10 @@ package com.xiaojukeji.know.streaming.km.persistence.kafka; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.constant.Constant; +import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.persistence.AbstractClusterLoadedChangedHandler; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; @@ -17,11 +18,12 @@ import org.springframework.stereotype.Component; import scala.Option; import java.util.Map; +import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; @Component public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler implements KafkaClient { - private static final ILog log = LogFactory.getLog(KafkaAdminZKClient.class); + private static final ILog LOGGER = LogFactory.getLog(KafkaAdminZKClient.class); /** * Kafka提供的KafkaZkClient @@ -92,13 +94,13 @@ public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler impl return; } - log.info("close ZK Client starting, clusterPhyId:{}", clusterPhyId); + LOGGER.info("method=closeZKClient||clusterPhyId={}||msg=close ZK Client starting", clusterPhyId); kafkaZkClient.close(); - log.info("close ZK Client success, clusterPhyId:{}", clusterPhyId); + LOGGER.info("method=closeZKClient||clusterPhyId={}||msg=close ZK Client success", clusterPhyId); } catch (Exception e) { - log.error("close ZK Client failed, clusterPhyId:{}", clusterPhyId, e); + LOGGER.error("method=closeZKClient||clusterPhyId={}||msg=close ZK Client failed||errMsg=exception!", clusterPhyId, e); } finally { modifyClientMapLock.unlock(); } @@ -107,19 +109,19 @@ public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler impl private KafkaZkClient createZKClient(Long clusterPhyId) throws NotExistException { ClusterPhy clusterPhy = LoadedClusterPhyCache.getByPhyId(clusterPhyId); if (clusterPhy == null) { - log.warn("create ZK Client failed, cluster not exist, clusterPhyId:{}", clusterPhyId); + LOGGER.warn("method=createZKClient||clusterPhyId={}||msg=create ZK Client failed, cluster not exist", clusterPhyId); throw new NotExistException(MsgConstant.getClusterPhyNotExist(clusterPhyId)); } if (ValidateUtils.isBlank(clusterPhy.getZookeeper())) { - log.warn("create ZK Client failed, zookeeper not exist, clusterPhyId:{}", clusterPhyId); + LOGGER.warn("method=createZKClient||clusterPhyId={}||msg=create ZK Client failed, zookeeper not exist", clusterPhyId); return null; } - return this.createZKClient(clusterPhyId, clusterPhy.getZookeeper()); + return this.createZKClient(clusterPhyId, clusterPhy); } - private KafkaZkClient createZKClient(Long clusterPhyId, String zookeeperAddress) { + private KafkaZkClient createZKClient(Long clusterPhyId, ClusterPhy clusterPhy) { try { modifyClientMapLock.lock(); @@ -128,33 +130,54 @@ public class KafkaAdminZKClient extends AbstractClusterLoadedChangedHandler impl return kafkaZkClient; } - log.debug("create ZK Client starting, clusterPhyId:{} zookeeperAddress:{}", clusterPhyId, zookeeperAddress); + ZKConfig zkConfig = this.getZKConfig(clusterPhy); + + LOGGER.debug("method=createZKClient||clusterPhyId={}||clusterPhy={}||msg=create ZK Client starting", clusterPhyId, clusterPhy); kafkaZkClient = KafkaZkClient.apply( - zookeeperAddress, - false, -// 添加支持zk的Kerberos认证 -// true, - Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS, - Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS, + clusterPhy.getZookeeper(), + zkConfig.getOpenSecure(), + zkConfig.getSessionTimeoutUnitMs(), + zkConfig.getRequestTimeoutUnitMs(), 5, Time.SYSTEM, - "KnowStreaming-clusterPhyId-" + clusterPhyId, - "SessionExpireListener", - Option.apply("KnowStreaming-clusterPhyId-" + clusterPhyId), - Option.apply(new ZKClientConfig()) + "KS-ZK-ClusterPhyId-" + clusterPhyId, + "KS-ZK-SessionExpireListener-clusterPhyId-" + clusterPhyId, + Option.apply("KS-ZK-ClusterPhyId-" + clusterPhyId), + Option.apply(this.getZKConfig(clusterPhyId, zkConfig.getOtherProps())) ); KAFKA_ZK_CLIENT_MAP.put(clusterPhyId, kafkaZkClient); KAFKA_ZK_CLIENT_CREATE_TIME.put(clusterPhyId, System.currentTimeMillis()); - log.info("create ZK Client success, clusterPhyId:{}", clusterPhyId); + LOGGER.info("method=createZKClient||clusterPhyId={}||msg=create ZK Client success", clusterPhyId); } catch (Exception e) { - log.error("create ZK Client failed, clusterPhyId:{} zookeeperAddress:{}", clusterPhyId, zookeeperAddress, e); + LOGGER.error("method=createZKClient||clusterPhyId={}||clusterPhy={}||msg=create ZK Client failed||errMsg=exception", clusterPhyId, clusterPhy, e); } finally { modifyClientMapLock.unlock(); } return KAFKA_ZK_CLIENT_MAP.get(clusterPhyId); } + + private ZKConfig getZKConfig(ClusterPhy clusterPhy) { + ZKConfig zkConfig = ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class); + if (zkConfig == null) { + return new ZKConfig(); + } + + return zkConfig; + } + + private ZKClientConfig getZKConfig(Long clusterPhyId, Properties props) { + ZKClientConfig zkClientConfig = new ZKClientConfig(); + + try { + props.entrySet().forEach(elem -> zkClientConfig.setProperty((String) elem.getKey(), (String) elem.getValue())); + } catch (Exception e) { + LOGGER.error("method=getZKConfig||clusterPhyId={}||props={}||errMsg=exception", clusterPhyId, props); + } + + return zkClientConfig; + } }