From 91c60ce72cbd655637663c968c1e7d58f827b9c0 Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 20 Feb 2023 16:01:29 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8D=E6=96=B0=E6=8E=A5?= =?UTF-8?q?=E5=85=A5=E7=9A=84=E9=9B=86=E7=BE=A4=EF=BC=8CController-Host?= =?UTF-8?q?=E4=B8=8D=E6=98=BE=E7=A4=BA=E7=9A=84=E9=97=AE=E9=A2=98(#927)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题原因: 1、新接入的集群,DB中暂未存储Broker信息,因此在存储Controller至DB时,查询DB中的Broker会查询为空。 解决方式: 1、存储Controller至DB前,主动获取一次Broker的信息。 --- .../handler/ControllerNodeChangeHandler.java | 5 ++- .../KafkaControllerService.java | 2 +- .../impl/KafkaControllerServiceImpl.java | 15 +++------ .../kafka/metadata/SyncControllerTask.java | 33 +++++++++++++++++-- 4 files changed, 39 insertions(+), 16 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java index b671c4a3..6498d118 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.core.flusher.zk.handler; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; @@ -100,7 +101,9 @@ public class ControllerNodeChangeHandler extends AbstractZKHandler implements ZN if (kafkaController == null) { kafkaControllerService.setNoKafkaController(clusterPhyId, triggerTime); } else { - kafkaControllerService.insertAndIgnoreDuplicateException(kafkaController); + Broker broker = kafkaZKDAO.getBrokerMetadata(clusterPhyId, kafkaController.getBrokerId()); + + kafkaControllerService.insertAndIgnoreDuplicateException(kafkaController, broker != null? broker.getHost(): "", broker != null? broker.getRack(): ""); } } catch (Exception e) { log.error("method=updateDBData||clusterPhyId={}||errMsg=exception", clusterPhyId, e); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java index 4ae53f8b..310967e5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java @@ -12,7 +12,7 @@ import java.util.Map; public interface KafkaControllerService { Result getControllerFromKafka(ClusterPhy clusterPhy); - int insertAndIgnoreDuplicateException(KafkaController kafkaController); + int insertAndIgnoreDuplicateException(KafkaController kafkaController, String controllerHost, String controllerRack); int setNoKafkaController(Long clusterPhyId, Long triggerTime); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java index 6047d844..511693c8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java @@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -15,7 +14,6 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.kafkacontroller.KafkaControllerDAO; @@ -32,9 +30,6 @@ import java.util.*; public class KafkaControllerServiceImpl implements KafkaControllerService { private static final ILog log = LogFactory.getLog(KafkaControllerServiceImpl.class); - @Autowired - private BrokerService brokerService; - @Autowired private KafkaAdminClient kafkaAdminClient; @@ -54,16 +49,14 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { } @Override - public int insertAndIgnoreDuplicateException(KafkaController kafkaController) { + public int insertAndIgnoreDuplicateException(KafkaController kafkaController, String controllerHost, String controllerRack) { try { - Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId()); - KafkaControllerPO kafkaControllerPO = new KafkaControllerPO(); kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId()); kafkaControllerPO.setBrokerId(kafkaController.getBrokerId()); kafkaControllerPO.setTimestamp(kafkaController.getTimestamp()); - kafkaControllerPO.setBrokerHost(broker != null? broker.getHost(): ""); - kafkaControllerPO.setBrokerRack(broker != null? broker.getRack(): ""); + kafkaControllerPO.setBrokerHost(controllerHost != null? controllerHost: ""); + kafkaControllerPO.setBrokerRack(controllerRack != null? controllerRack: ""); kafkaControllerDAO.insert(kafkaControllerPO); } catch (DuplicateKeyException dke) { // ignore @@ -92,7 +85,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { // 归一化到秒, 并且将去1秒,避免gc导致时间不对 noKafkaController.setTimestamp(triggerTime); - return this.insertAndIgnoreDuplicateException(noKafkaController); + return this.insertAndIgnoreDuplicateException(noKafkaController, "", ""); } @Override diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java index ea0a7696..45b4ad03 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java @@ -5,12 +5,16 @@ import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import org.springframework.beans.factory.annotation.Autowired; +import java.util.List; + @Task(name = "SyncControllerTask", description = "Controller信息同步到DB", @@ -19,7 +23,10 @@ import org.springframework.beans.factory.annotation.Autowired; consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask { - private static final ILog log = LogFactory.getLog(SyncControllerTask.class); + private static final ILog LOGGER = LogFactory.getLog(SyncControllerTask.class); + + @Autowired + private BrokerService brokerService; @Autowired private KafkaControllerService kafkaControllerService; @@ -33,10 +40,30 @@ public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask { if (controllerResult.getData() == null) { kafkaControllerService.setNoKafkaController(clusterPhy.getId(), System.currentTimeMillis() / 1000L * 1000L); - } else { - kafkaControllerService.insertAndIgnoreDuplicateException(controllerResult.getData()); + + return TaskResult.SUCCESS; } + + Broker controllerBroker = null; + + Result> brokerListResult = brokerService.listBrokersFromKafka(clusterPhy); + if (brokerListResult.failed()) { + LOGGER.error("method=processClusterTask||clusterPhyId={}||result={}||errMsg=list brokers failed", clusterPhy.getId(), brokerListResult); + } else { + for (Broker broker: brokerListResult.getData()) { + if (broker.getBrokerId().equals(controllerResult.getData().getBrokerId())) { + controllerBroker = broker; + } + } + } + + kafkaControllerService.insertAndIgnoreDuplicateException( + controllerResult.getData(), + controllerBroker != null? controllerBroker.getHost(): "", + controllerBroker != null? controllerBroker.getRack(): "" + ); + return TaskResult.SUCCESS; } }