From 725ac10c3d881c00ffefccdfc5b338adbe3db963 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Thu, 22 Sep 2022 11:30:46 +0800 Subject: [PATCH] =?UTF-8?q?1=E3=80=81=E8=B0=83=E6=95=B4KafkaZKDao=E4=BD=8D?= =?UTF-8?q?=E7=BD=AE=EF=BC=9B2=E3=80=81offset=E4=BF=A1=E6=81=AF=E8=8E=B7?= =?UTF-8?q?=E5=8F=96=E6=97=B6=EF=BC=8C=E8=BF=87=E6=BB=A4=E6=8E=89=E6=97=A0?= =?UTF-8?q?leader=E5=88=86=E5=8C=BA=EF=BC=9B3=E3=80=81=E8=B0=83=E6=95=B4?= =?UTF-8?q?=E9=AA=8C=E8=AF=81ZK=E6=98=AF=E5=90=A6=E5=90=88=E6=B3=95?= =?UTF-8?q?=E6=97=B6=E7=9A=84session=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/common/bean/entity/broker/Broker.java | 15 ---------- .../km/common/constant/MsgConstant.java | 4 +++ .../km/core/flusher/zk/AbstractZKWatcher.java | 2 +- .../flusher/zk/handler/AbstractZKHandler.java | 2 +- .../zk/handler/BrokersNodeChangeHandler.java | 2 +- .../ConfigNotificationNodeChangeHandler.java | 8 ++--- .../handler/ControllerNodeChangeHandler.java | 2 +- .../zk/handler/TopicsNodeChangeHandler.java | 2 +- .../broker/impl/BrokerServiceImpl.java | 8 ++--- .../impl/ClusterValidateServiceImpl.java | 4 +-- .../impl/KafkaControllerServiceImpl.java | 2 +- .../partition/impl/PartitionServiceImpl.java | 22 ++++++++++---- .../topic/impl/OpTopicServiceImpl.java | 2 +- .../topic/impl/TopicConfigServiceImpl.java | 2 +- .../service/topic/impl/TopicServiceImpl.java | 2 +- .../kafka/zookeeper/package-info.java | 4 +++ .../zookeeper/service}/KafkaZKDAO.java | 2 +- .../service}/impl/KafkaZKDAOImpl.java | 30 ++++++++++++++----- .../zookeeper/znode/ControllerData.java | 2 +- .../znode/brokers/BrokerMetadata.java | 2 +- .../zookeeper/znode/brokers/PartitionMap.java | 2 +- .../znode/brokers/PartitionState.java | 2 +- .../znode/brokers/TopicMetadata.java | 2 +- .../ConfigChangeNotificationBaseData.java | 2 +- .../ConfigChangeNotificationDataV1.java | 2 +- .../ConfigChangeNotificationDataV2.java | 2 +- .../znode/config/ConfigNodeData.java | 2 +- 27 files changed, 74 insertions(+), 59 deletions(-) create mode 100644 km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java rename km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/{zk => kafka/zookeeper/service}/KafkaZKDAO.java (97%) rename km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/{zk => kafka/zookeeper/service}/impl/KafkaZKDAOImpl.java (90%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/ControllerData.java (81%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/brokers/BrokerMetadata.java (97%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/brokers/PartitionMap.java (91%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/brokers/PartitionState.java (93%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/brokers/TopicMetadata.java (91%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/config/ConfigChangeNotificationBaseData.java (77%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/config/ConfigChangeNotificationDataV1.java (86%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/config/ConfigChangeNotificationDataV2.java (90%) rename {km-common/src/main/java/com/xiaojukeji/know/streaming/km/common => km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka}/zookeeper/znode/config/ConfigNodeData.java (81%) diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java index fa67cac5..752aade0 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/broker/Broker.java @@ -5,7 +5,6 @@ import com.alibaba.fastjson.TypeReference; import com.xiaojukeji.know.streaming.km.common.bean.entity.common.IpPortData; import com.xiaojukeji.know.streaming.km.common.bean.po.broker.BrokerPO; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @@ -79,20 +78,6 @@ public class Broker implements Serializable { return metadata; } - public static Broker buildFrom(Long clusterPhyId, Integer brokerId, BrokerMetadata brokerMetadata) { - Broker metadata = new Broker(); - metadata.setClusterPhyId(clusterPhyId); - metadata.setBrokerId(brokerId); - metadata.setHost(brokerMetadata.getHost()); - metadata.setPort(brokerMetadata.getPort()); - metadata.setJmxPort(brokerMetadata.getJmxPort()); - metadata.setStartTimestamp(brokerMetadata.getTimestamp()); - metadata.setRack(brokerMetadata.getRack()); - metadata.setStatus(1); - metadata.setEndpointMap(brokerMetadata.getEndpointMap()); - return metadata; - } - public static Broker buildFrom(BrokerPO brokerPO) { Broker broker = ConvertUtil.obj2Obj(brokerPO, Broker.class); String endpointMapStr = brokerPO.getEndpointMap(); diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java index 3d0b6a5c..1be8dadf 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/constant/MsgConstant.java @@ -52,6 +52,10 @@ public class MsgConstant { /**************************************************** Partition ****************************************************/ + public static String getPartitionNoLeader(Long clusterPhyId, String topicName) { + return String.format("集群ID:[%d] Topic名称:[%s] 所有分区NoLeader", clusterPhyId, topicName); + } + public static String getPartitionNotExist(Long clusterPhyId, String topicName) { return String.format("集群ID:[%d] Topic名称:[%s] 存在非法的分区ID", clusterPhyId, topicName); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java index 261aff0a..e43f1b40 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/AbstractZKWatcher.java @@ -5,7 +5,7 @@ import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.KafkaZkClient; import org.springframework.beans.factory.annotation.Autowired; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java index e2ed09d1..04a14e87 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/AbstractZKHandler.java @@ -7,7 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; public abstract class AbstractZKHandler { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java index b7c93c2f..314195af 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/BrokersNodeChangeHandler.java @@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.BrokerIdsZNode; import kafka.zookeeper.ZNodeChildChangeHandler; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java index 91d91571..1e626632 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ConfigNotificationNodeChangeHandler.java @@ -8,11 +8,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.KafkaConfigTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationBaseData; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationDataV1; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config.ConfigChangeNotificationDataV2; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationBaseData; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationDataV1; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config.ConfigChangeNotificationDataV2; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.ConfigEntityChangeNotificationZNode; import kafka.zookeeper.ZNodeChildChangeHandler; import org.apache.zookeeper.data.Stat; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java index 904b7d72..b671c4a3 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java @@ -11,7 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.ControllerZNode; import kafka.zookeeper.ZNodeChangeHandler; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java index 31602632..88c01281 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/TopicsNodeChangeHandler.java @@ -9,7 +9,7 @@ import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.TopicsZNode; import kafka.zookeeper.ZNodeChildChangeHandler; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java index dc702388..fbede23c 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/broker/impl/BrokerServiceImpl.java @@ -24,7 +24,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept import com.xiaojukeji.know.streaming.km.common.jmx.JmxConnectorWrap; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; @@ -32,8 +31,7 @@ import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaJMXClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.broker.BrokerDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; -import kafka.zk.BrokerIdZNode; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.BrokerIdsZNode; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.Node; @@ -310,9 +308,7 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok List brokerIdList = kafkaZKDAO.getChildren(clusterPhy.getId(), BrokerIdsZNode.path(), false); for (String brokerId: brokerIdList) { - BrokerMetadata metadata = kafkaZKDAO.getData(clusterPhy.getId(), BrokerIdZNode.path(Integer.valueOf(brokerId)), BrokerMetadata.class); - BrokerMetadata.parseAndUpdateBrokerMetadata(metadata); - brokerList.add(Broker.buildFrom(clusterPhy.getId(), Integer.valueOf(brokerId), metadata)); + brokerList.add(kafkaZKDAO.getBrokerMetadata(clusterPhy.getId(), Integer.valueOf(brokerId))); } return Result.buildSuc(brokerList); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java index 6dcd858e..ba72d2fe 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterValidateServiceImpl.java @@ -13,8 +13,8 @@ import com.xiaojukeji.know.streaming.km.common.enums.valid.ValidateKafkaAddressE import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterValidateService; import com.xiaojukeji.know.streaming.km.persistence.jmx.JmxDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.impl.KafkaZKDAOImpl; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl.KafkaZKDAOImpl; import kafka.server.KafkaConfig; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java index 1fb3f488..8048eabe 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java @@ -19,7 +19,7 @@ import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.kafkacontroller.KafkaControllerDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.Node; import org.springframework.beans.factory.annotation.Autowired; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java index 13eedb41..1795e4d4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/partition/impl/PartitionServiceImpl.java @@ -21,14 +21,14 @@ import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.CommonUtils; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionMap; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionState; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionState; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaConsumerClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.partition.PartitionDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.TopicPartitionStateZNode; import kafka.zk.TopicPartitionsZNode; import kafka.zk.TopicZNode; @@ -202,10 +202,22 @@ public class PartitionServiceImpl extends BaseVersionControlService implements P @Override public Result> getPartitionOffsetFromKafka(Long clusterPhyId, String topicName, OffsetSpec offsetSpec, Long timestamp) { Map topicPartitionOffsets = new HashMap<>(); - this.listPartitionByTopic(clusterPhyId, topicName) - .stream() + + List partitionList = this.listPartitionByTopic(clusterPhyId, topicName); + if (partitionList == null || partitionList.isEmpty()) { + // Topic不存在 + return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getTopicNotExist(clusterPhyId, topicName)); + } + + partitionList.stream() + .filter(item -> !item.getLeaderBrokerId().equals(KafkaConstant.NO_LEADER)) .forEach(elem -> topicPartitionOffsets.put(new TopicPartition(topicName, elem.getPartitionId()), offsetSpec)); + if (topicPartitionOffsets.isEmpty()) { + // 所有分区no-leader + return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FAILED, MsgConstant.getPartitionNoLeader(clusterPhyId, topicName)); + } + try { return (Result>) doVCHandler(clusterPhyId, PARTITION_OFFSET_GET, new PartitionOffsetParam(clusterPhyId, topicName, topicPartitionOffsets, timestamp)); } catch (VCHandlerNotExistException e) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java index 7f289c88..7cd017f4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/OpTopicServiceImpl.java @@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.controller.ReplicaAssignment; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java index 9aaadee5..09be0d43 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java @@ -30,7 +30,7 @@ import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.server.ConfigType; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java index bffabec8..e2870d9d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java @@ -23,7 +23,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.topic.TopicDAO; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.zk.TopicsZNode; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.TopicPartitionInfo; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java new file mode 100644 index 00000000..88139db3 --- /dev/null +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/package-info.java @@ -0,0 +1,4 @@ +/** + * 读取Kafka在ZK中存储的数据的包 + */ +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper; \ No newline at end of file diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/KafkaZKDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java similarity index 97% rename from km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/KafkaZKDAO.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java index 3e00e558..7a7d4b76 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/KafkaZKDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/KafkaZKDAO.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.persistence.zk; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/impl/KafkaZKDAOImpl.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java similarity index 90% rename from km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/impl/KafkaZKDAOImpl.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java index 61a7bad0..82cb8130 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/zk/impl/KafkaZKDAOImpl.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/service/impl/KafkaZKDAOImpl.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.persistence.zk.impl; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.impl; import com.alibaba.fastjson.JSON; import com.didiglobal.logi.log.ILog; @@ -11,11 +11,11 @@ import com.xiaojukeji.know.streaming.km.common.enums.topic.TopicTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; import com.xiaojukeji.know.streaming.km.common.utils.Tuple; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.ControllerData; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.BrokerMetadata; -import com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers.PartitionMap; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.ControllerData; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.BrokerMetadata; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers.PartitionMap; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; -import com.xiaojukeji.know.streaming.km.persistence.zk.KafkaZKDAO; +import com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.service.KafkaZKDAO; import kafka.utils.Json; import kafka.zk.*; import kafka.zookeeper.AsyncResponse; @@ -46,14 +46,14 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { public Broker getBrokerMetadata(String zkAddress) throws KeeperException.NoNodeException, AdminOperateException { ZooKeeper zooKeeper = null; try { - zooKeeper = new ZooKeeper(zkAddress, 1000, watchedEvent -> logger.info(" receive event : " + watchedEvent.getType().name())); + zooKeeper = new ZooKeeper(zkAddress, 3000, watchedEvent -> logger.info(" receive event : " + watchedEvent.getType().name())); List brokerIdList = this.getChildren(zooKeeper, BrokerIdsZNode.path()); if (brokerIdList == null || brokerIdList.isEmpty()) { return null; } BrokerMetadata brokerMetadata = this.getData(zooKeeper, BrokerIdZNode.path(Integer.parseInt(brokerIdList.get(0))), false, BrokerMetadata.class); - return Broker.buildFrom(null, Integer.valueOf(brokerIdList.get(0)), brokerMetadata); + return this.convert2Broker(null, Integer.valueOf(brokerIdList.get(0)), brokerMetadata); } catch (KeeperException.NoNodeException nne) { logger.warn("method=getBrokerMetadata||zkAddress={}||errMsg=exception", zkAddress, nne); throw nne; @@ -79,7 +79,7 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { try { BrokerMetadata metadata = this.getData(kafkaZkClient.currentZooKeeper(), BrokerIdZNode.path(brokerId), false, BrokerMetadata.class); BrokerMetadata.parseAndUpdateBrokerMetadata(metadata); - return Broker.buildFrom(clusterPhyId, brokerId, metadata); + return this.convert2Broker(clusterPhyId, brokerId, metadata); } catch (KeeperException ke) { logger.error("method=getBrokerMetadata||clusterPhyId={}||brokerId={}||errMsg=exception", clusterPhyId, brokerId, ke); throw ke; @@ -269,4 +269,18 @@ public class KafkaZKDAOImpl implements KafkaZKDAO { byte[] bytes = zooKeeper.getData(path, addWatch, null); return JSON.parseObject(bytes, clazz); } + + private Broker convert2Broker(Long clusterPhyId, Integer brokerId, BrokerMetadata brokerMetadata) { + Broker metadata = new Broker(); + metadata.setClusterPhyId(clusterPhyId); + metadata.setBrokerId(brokerId); + metadata.setHost(brokerMetadata.getHost()); + metadata.setPort(brokerMetadata.getPort()); + metadata.setJmxPort(brokerMetadata.getJmxPort()); + metadata.setStartTimestamp(brokerMetadata.getTimestamp()); + metadata.setRack(brokerMetadata.getRack()); + metadata.setStatus(1); + metadata.setEndpointMap(brokerMetadata.getEndpointMap()); + return metadata; + } } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/ControllerData.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java similarity index 81% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/ControllerData.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java index f69c6862..afc7f55b 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/ControllerData.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/ControllerData.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/BrokerMetadata.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java similarity index 97% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/BrokerMetadata.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java index 480867af..3b252c5f 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/BrokerMetadata.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/BrokerMetadata.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionMap.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java similarity index 91% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionMap.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java index bf1fbd1a..4bc36cac 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionMap.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionMap.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionState.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java similarity index 93% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionState.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java index 60ae4307..47be5cb9 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/PartitionState.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/PartitionState.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/TopicMetadata.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java similarity index 91% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/TopicMetadata.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java index 803a5e29..f84c8fcf 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/brokers/TopicMetadata.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/brokers/TopicMetadata.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.brokers; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.brokers; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationBaseData.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java similarity index 77% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationBaseData.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java index 86a3abe9..09ffee10 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationBaseData.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationBaseData.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import lombok.AllArgsConstructor; import lombok.Data; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV1.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java similarity index 86% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV1.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java index 75598e65..1853b940 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV1.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV1.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV2.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java similarity index 90% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV2.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java index 6b0d8806..5e6024fa 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigChangeNotificationDataV2.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigChangeNotificationDataV2.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import com.fasterxml.jackson.annotation.JsonProperty; import lombok.AllArgsConstructor; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigNodeData.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java similarity index 81% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigNodeData.java rename to km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java index 13132b4f..287912dc 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/zookeeper/znode/config/ConfigNodeData.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/kafka/zookeeper/znode/config/ConfigNodeData.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.zookeeper.znode.config; +package com.xiaojukeji.know.streaming.km.persistence.kafka.zookeeper.znode.config; import lombok.AllArgsConstructor; import lombok.Data;