1、调整KafkaZKDao位置;2、offset信息获取时,过滤掉无leader分区;3、调整验证ZK是否合法时的session超时时间

This commit is contained in:
zengqiao
2022-09-22 11:30:46 +08:00
parent 2b76358c8f
commit 725ac10c3d
27 changed files with 74 additions and 59 deletions

View File

@@ -0,0 +1,4 @@
/**
* 读取Kafka在ZK中存储的数据的包
*/
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.know.streaming.km.persistence.zk;
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController;

View File

@@ -1,4 +1,4 @@
package com.xiaojukeji.know.streaming.km.persistence.zk.impl;
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl;
import com.alibaba.fastjson.JSON;
import com.didiglobal.logi.log.ILog;
@@ -11,11 +11,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
import com.xiaojukeji.know.streaming.km.common.exception.NotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.Tuple;
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.ControllerData;
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionMap;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.ControllerData;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.BrokerMetadata;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO;
import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO;
import kafka.utils.Json;
import kafka.zk.*;
import kafka.zookeeper.AsyncResponse;
@@ -46,14 +46,14 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
public Broker getBrokerMetadata(String zkAddress) throws KeeperException.NoNodeException, AdminOperateException {
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(zkAddress, 1000, watchedEvent -> logger.info(" receive event : " + watchedEvent.getType().name()));
zooKeeper = new ZooKeeper(zkAddress, 3000, watchedEvent -> logger.info(" receive event : " + watchedEvent.getType().name()));
List<String> brokerIdList = this.getChildren(zooKeeper, BrokerIdsZNode.path());
if (brokerIdList == null || brokerIdList.isEmpty()) {
return null;
}
BrokerMetadata brokerMetadata = this.getData(zooKeeper, BrokerIdZNode.path(Integer.parseInt(brokerIdList.get(0))), false, BrokerMetadata.class);
return Broker.buildFrom(null, Integer.valueOf(brokerIdList.get(0)), brokerMetadata);
return this.convert2Broker(null, Integer.valueOf(brokerIdList.get(0)), brokerMetadata);
} catch (KeeperException.NoNodeException nne) {
logger.warn("method=getBrokerMetadata||zkAddress={}||errMsg=exception", zkAddress, nne);
throw nne;
@@ -79,7 +79,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
try {
BrokerMetadata metadata = this.getData(kafkaZkClient.currentZooKeeper(), BrokerIdZNode.path(brokerId), false, BrokerMetadata.class);
BrokerMetadata.parseAndUpdateBrokerMetadata(metadata);
return Broker.buildFrom(clusterPhyId, brokerId, metadata);
return this.convert2Broker(clusterPhyId, brokerId, metadata);
} catch (KeeperException ke) {
logger.error("method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception", clusterPhyId, brokerId, ke);
throw ke;
@@ -269,4 +269,18 @@ public class KafkaZKDAOImpl implements KafkaZKDAO {
byte[] bytes = zooKeeper.getData(path, addWatch, null);
return JSON.parseObject(bytes, clazz);
}
private Broker convert2Broker(Long clusterPhyId, Integer brokerId, BrokerMetadata brokerMetadata) {
Broker metadata = new Broker();
metadata.setClusterPhyId(clusterPhyId);
metadata.setBrokerId(brokerId);
metadata.setHost(brokerMetadata.getHost());
metadata.setPort(brokerMetadata.getPort());
metadata.setJmxPort(brokerMetadata.getJmxPort());
metadata.setStartTimestamp(brokerMetadata.getTimestamp());
metadata.setRack(brokerMetadata.getRack());
metadata.setStatus(1);
metadata.setEndpointMap(brokerMetadata.getEndpointMap());
return metadata;
}
}

View File

@@ -0,0 +1,22 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 19/4/22
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ControllerData {
private Integer brokerid;
private Integer version;
private Long timestamp;
}

View File

@@ -0,0 +1,129 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil;
import lombok.Data;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author zengqiao
* @date 19/4/3
*
* 存储Broker的元信息, 元信息对应的ZK节点是/brokers/ids/{brokerId}
* 节点结构:
* {
* "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT"},
* "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093"],
* "jmx_port":9999,
* "host":null,
* "timestamp":"1546632983233",
* "port":-1,
* "version":4,
* "rack": "CY"
* }
*
* {
* "listener_security_protocol_map":{"SASL_PLAINTEXT":"SASL_PLAINTEXT","PLAINTEXT":"PLAINTEXT"},
* "endpoints":["SASL_PLAINTEXT://10.179.162.202:9093","PLAINTEXT://10.179.162.202:9092"],
* "jmx_port":8099,
* "host":"10.179.162.202",
* "timestamp":"1628833925822",
* "port":9092,
* "version":4
* }
*
* {
* "listener_security_protocol_map":{"EXTERNAL":"SASL_PLAINTEXT","INTERNAL":"SASL_PLAINTEXT"},
* "endpoints":["EXTERNAL://10.179.162.202:7092","INTERNAL://10.179.162.202:7093"],
* "jmx_port":8099,
* "host":null,
* "timestamp":"1627289710439",
* "port":-1,
* "version":4
* }
*
*/
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class BrokerMetadata implements Serializable {
private static final long serialVersionUID = 3918113492423375809L;
private List<String> endpoints;
// <EXTERNAL|INTERNAL, <ip, port>>
private Map<String, IpPortData> endpointMap;
private String host;
private Integer port;
@JsonProperty("jmx_port")
private Integer jmxPort;
private Integer version;
private Long timestamp;
private String rack;
@JsonIgnore
public String getExternalHost() {
if (!endpointMap.containsKey(KafkaConstant.EXTERNAL_KEY)) {
// external如果不存在就返回host
return host;
}
return endpointMap.get(KafkaConstant.EXTERNAL_KEY).getIp();
}
@JsonIgnore
public String getInternalHost() {
if (!endpointMap.containsKey(KafkaConstant.INTERNAL_KEY)) {
// internal如果不存在就返回host
return host;
}
return endpointMap.get(KafkaConstant.INTERNAL_KEY).getIp();
}
public static void parseAndUpdateBrokerMetadata(BrokerMetadata brokerMetadata) {
brokerMetadata.setEndpointMap(new HashMap<>());
if (brokerMetadata.getEndpoints().isEmpty()) {
return;
}
// example EXTERNAL://10.179.162.202:7092
for (String endpoint: brokerMetadata.getEndpoints()) {
int idx1 = endpoint.indexOf("://");
int idx2 = endpoint.lastIndexOf(":");
if (idx1 == -1 || idx2 == -1 || idx1 == idx2) {
continue;
}
String brokerHost = endpoint.substring(idx1 + "://".length(), idx2);
String brokerPort = endpoint.substring(idx2 + 1);
brokerMetadata.getEndpointMap().put(endpoint.substring(0, idx1), new IpPortData(brokerHost, brokerPort));
if (KafkaConstant.INTERNAL_KEY.equals(endpoint.substring(0, idx1))) {
// 优先使用internal的地址进行展示
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort));
}
if (null == brokerMetadata.getHost()) {
brokerMetadata.setHost(brokerHost);
brokerMetadata.setPort(ConvertUtil.string2Integer(brokerPort));
}
}
}
}

View File

@@ -0,0 +1,41 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
* 根据/brokers/topics/topic的节点内容定义
* @author tukun
* @date 2015/11/10.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class PartitionMap implements Serializable {
/**
* 版本号
*/
private int version;
/**
* Map<PartitionId副本所在的brokerId列表>
*/
private Map<Integer, List<Integer>> partitions;
public List<Integer> getPartitionAssignReplicas(Integer partitionId) {
if (partitions == null) {
return new ArrayList<>();
}
return partitions.getOrDefault(partitionId, new ArrayList<>());
}
}

View File

@@ -0,0 +1,55 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.List;
/**
* PartitionState信息对应ZK/brokers/topics/__consumer_offsets/partitions/0/state 节点
* 该节点的数据结构:
* "{\"controller_epoch\":7,\"leader\":2,\"version\":1,\"leader_epoch\":7,\"isr\":[2,0]}"
* @author tukun
* @date 2015/11/10.
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class PartitionState implements Serializable {
/**
* partition id
*/
private Integer partitionId;
/**
* kafka集群中的中央控制器选举次数
*/
@JsonProperty("controller_epoch")
private Integer controllerEpoch;
/**
* Partition所属的leader broker编号
*/
private Integer leader;
/**
* partition的版本号
*/
private Integer version;
/**
* 该partition leader选举次数
*/
@JsonProperty("leader_epoch")
private Integer leaderEpoch;
/**
* 同步副本组brokerId列表
*/
private List<Integer> isr;
}

View File

@@ -0,0 +1,55 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import java.io.Serializable;
import java.util.Set;
/**
* 存储Topic的元信息, 元信息对应的ZK节点是/brokers/topics/${topicName}
* @author zengqiao
* @date 19/4/3
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TopicMetadata implements Serializable {
/**
* topic名称
*/
private String topic;
/**
* partition所在的Broker
*/
private PartitionMap partitionMap;
/**
* topic所在的broker, 由partitionMap获取得到
*/
private Set<Integer> brokerIdSet;
/**
* 副本数
*/
private Integer replicaNum;
/**
* 分区数
*/
private Integer partitionNum;
/**
* 修改节点的时间
*/
private Long modifyTime;
/**
* 创建节点的时间
*/
private Long createTime;
}

View File

@@ -0,0 +1,18 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/5/14
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConfigChangeNotificationBaseData {
protected Integer version;
}

View File

@@ -0,0 +1,25 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/5/14
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConfigChangeNotificationDataV1 extends ConfigChangeNotificationBaseData {
public static final Integer CHANGE_DATA_VERSION = 1;
@JsonProperty("entity_type")
private String entityType;
@JsonProperty("entity_name")
private String entityName;
}

View File

@@ -0,0 +1,29 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/5/14
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConfigChangeNotificationDataV2 extends ConfigChangeNotificationBaseData {
public static final Integer CHANGE_DATA_VERSION = 2;
@JsonProperty("entity_path")
private String entityPath;
public static ConfigChangeNotificationDataV2 getChangeData(String entityPath) {
ConfigChangeNotificationDataV2 changeData = new ConfigChangeNotificationDataV2();
changeData.setEntityPath(entityPath);
changeData.setVersion(CHANGE_DATA_VERSION);
return changeData;
}
}

View File

@@ -0,0 +1,22 @@
package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
/**
* @author zengqiao
* @date 20/5/12
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class ConfigNodeData<T> {
public static final Integer CONFIGDATA_VERSION = 1;
private T config;
private Integer version;
}