diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java index b671c4a3..6498d118 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/flusher/zk/handler/ControllerNodeChangeHandler.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.core.flusher.zk.handler; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; @@ -100,7 +101,9 @@ public class ControllerNodeChangeHandler extends AbstractZKHandler implements ZN if (kafkaController == null) { kafkaControllerService.setNoKafkaController(clusterPhyId, triggerTime); } else { - kafkaControllerService.insertAndIgnoreDuplicateException(kafkaController); + Broker broker = kafkaZKDAO.getBrokerMetadata(clusterPhyId, kafkaController.getBrokerId()); + + kafkaControllerService.insertAndIgnoreDuplicateException(kafkaController, broker != null? broker.getHost(): "", broker != null? broker.getRack(): ""); } } catch (Exception e) { log.error("method=updateDBData||clusterPhyId={}||errMsg=exception", clusterPhyId, e); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java index 4ae53f8b..310967e5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/KafkaControllerService.java @@ -12,7 +12,7 @@ import java.util.Map; public interface KafkaControllerService { Result getControllerFromKafka(ClusterPhy clusterPhy); - int insertAndIgnoreDuplicateException(KafkaController kafkaController); + int insertAndIgnoreDuplicateException(KafkaController kafkaController, String controllerHost, String controllerRack); int setNoKafkaController(Long clusterPhyId, Long triggerTime); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java index 6047d844..511693c8 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkacontroller/impl/KafkaControllerServiceImpl.java @@ -5,7 +5,6 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; -import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -15,7 +14,6 @@ import com.xiaojukeji.know.streaming.km.common.constant.Constant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterRunStateEnum; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; -import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.kafkacontroller.KafkaControllerDAO; @@ -32,9 +30,6 @@ import java.util.*; public class KafkaControllerServiceImpl implements KafkaControllerService { private static final ILog log = LogFactory.getLog(KafkaControllerServiceImpl.class); - @Autowired - private BrokerService brokerService; - @Autowired private KafkaAdminClient kafkaAdminClient; @@ -54,16 +49,14 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { } @Override - public int insertAndIgnoreDuplicateException(KafkaController kafkaController) { + public int insertAndIgnoreDuplicateException(KafkaController kafkaController, String controllerHost, String controllerRack) { try { - Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId()); - KafkaControllerPO kafkaControllerPO = new KafkaControllerPO(); kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId()); kafkaControllerPO.setBrokerId(kafkaController.getBrokerId()); kafkaControllerPO.setTimestamp(kafkaController.getTimestamp()); - kafkaControllerPO.setBrokerHost(broker != null? broker.getHost(): ""); - kafkaControllerPO.setBrokerRack(broker != null? broker.getRack(): ""); + kafkaControllerPO.setBrokerHost(controllerHost != null? controllerHost: ""); + kafkaControllerPO.setBrokerRack(controllerRack != null? controllerRack: ""); kafkaControllerDAO.insert(kafkaControllerPO); } catch (DuplicateKeyException dke) { // ignore @@ -92,7 +85,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService { // 归一化到秒, 并且将去1秒,避免gc导致时间不对 noKafkaController.setTimestamp(triggerTime); - return this.insertAndIgnoreDuplicateException(noKafkaController); + return this.insertAndIgnoreDuplicateException(noKafkaController, "", ""); } @Override diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java index ea0a7696..45b4ad03 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncControllerTask.java @@ -5,12 +5,16 @@ import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkacontroller.KafkaController; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.kafkacontroller.KafkaControllerService; import org.springframework.beans.factory.annotation.Autowired; +import java.util.List; + @Task(name = "SyncControllerTask", description = "Controller信息同步到DB", @@ -19,7 +23,10 @@ import org.springframework.beans.factory.annotation.Autowired; consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask { - private static final ILog log = LogFactory.getLog(SyncControllerTask.class); + private static final ILog LOGGER = LogFactory.getLog(SyncControllerTask.class); + + @Autowired + private BrokerService brokerService; @Autowired private KafkaControllerService kafkaControllerService; @@ -33,10 +40,30 @@ public class SyncControllerTask extends AbstractAsyncMetadataDispatchTask { if (controllerResult.getData() == null) { kafkaControllerService.setNoKafkaController(clusterPhy.getId(), System.currentTimeMillis() / 1000L * 1000L); - } else { - kafkaControllerService.insertAndIgnoreDuplicateException(controllerResult.getData()); + + return TaskResult.SUCCESS; } + + Broker controllerBroker = null; + + Result> brokerListResult = brokerService.listBrokersFromKafka(clusterPhy); + if (brokerListResult.failed()) { + LOGGER.error("method=processClusterTask||clusterPhyId={}||result={}||errMsg=list brokers failed", clusterPhy.getId(), brokerListResult); + } else { + for (Broker broker: brokerListResult.getData()) { + if (broker.getBrokerId().equals(controllerResult.getData().getBrokerId())) { + controllerBroker = broker; + } + } + } + + kafkaControllerService.insertAndIgnoreDuplicateException( + controllerResult.getData(), + controllerBroker != null? controllerBroker.getHost(): "", + controllerBroker != null? controllerBroker.getRack(): "" + ); + return TaskResult.SUCCESS; } }