From d1417bef8c39795910cda0e75f03b2224b6334ac Mon Sep 17 00:00:00 2001 From: EricZeng Date: Wed, 16 Aug 2023 10:54:58 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8D=E5=88=A0=E9=99=A4Kaf?= =?UTF-8?q?ka=E9=9B=86=E7=BE=A4=E5=90=8E=EF=BC=8CConnect=E9=9B=86=E7=BE=A4?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E5=87=BA=E7=8E=B0NPE=E9=97=AE=E9=A2=98=20(#1?= =?UTF-8?q?129)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 原因: 首先,删除Kafka集群后,没有将DB中的Connect集群进行删除。随后,进行Connect集群指标采集时,由于所在的Kafka集群已经不存在了。最终,导致NPE; 解决: 发布一个Kafka集群删除事件,触发MetaDataService子类,将其在DB中的数据进行删除。 遗留: 当前MetaDataService仅在部分元信息同步类中实现,导致当前DB中的脏数据清理不彻底,后续等MetaDataService在所有元信息同步类中实现后,便可彻底清理数据。 PS:当前修复已保证NPE问题不会再出现。 --- .../connect/ClusterPhyDeletedEvent.java | 16 ++++++ .../cluster/impl/ClusterPhyServiceImpl.java | 4 ++ .../cluster/ConnectClusterService.java | 4 +- .../impl/ConnectClusterServiceImpl.java | 8 +++ .../listener/TaskClusterDeletedListener.java | 53 +++++++++++++++++++ 5 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java create mode 100644 km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java new file mode 100644 index 00000000..29e19bc6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect; + +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyBaseEvent; +import lombok.Getter; + +/** + * 集群删除事件 + * @author zengqiao + * @date 23/08/15 + */ +@Getter +public class ClusterPhyDeletedEvent extends ClusterPhyBaseEvent { + public ClusterPhyDeletedEvent(Object source, Long clusterPhyId) { + super(source, clusterPhyId); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 43a0557c..6f9c5cfa 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent; import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO; import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; @@ -146,6 +147,9 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { String.format("删除集群:%s",clusterPhy.toString())); opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + // 发布删除集群事件 + SpringTool.publish(new ClusterPhyDeletedEvent(this, clusterPhyId)); + return Result.buildSuc(); } catch (Exception e) { log.error("method=removeClusterPhyById||clusterPhyId={}||operator={}||msg=remove cluster failed||errMsg=exception!", diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java index e6ad3929..621a5643 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java @@ -4,14 +4,16 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.cluster; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; import java.util.List; /** * Connect-Cluster */ -public interface ConnectClusterService { +public interface ConnectClusterService extends MetaDataService { Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata); List listByKafkaCluster(Long kafkaClusterPhyId); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java index 86879662..c0908b33 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java @@ -38,6 +38,14 @@ public class ConnectClusterServiceImpl implements ConnectClusterService { @Autowired private OpLogWrapService opLogWrapService; + @Override + public int deleteInDBByKafkaClusterId(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId); + + return connectClusterDAO.deleteById(lambdaQueryWrapper); + } + @Override public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) { ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java new file mode 100644 index 00000000..b10d61be --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java @@ -0,0 +1,53 @@ +package com.xiaojukeji.know.streaming.km.task.service.listener; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Service; + +@Service +public class TaskClusterDeletedListener implements ApplicationListener { + private static final ILog LOGGER = LogFactory.getLog(TaskClusterDeletedListener.class); + + @Override + public void onApplicationEvent(ClusterPhyDeletedEvent event) { + LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened delete cluster", event.getClusterPhyId()); + + // 交由KS自定义的线程池,异步执行任务 + FutureUtil.quickStartupFutureUtil.submitTask( + () -> { + // 延迟60秒,避免正在运行的任务,将数据写入DB中 + BackoffUtils.backoff(60000); + + for (MetaDataService metaDataService: SpringTool.getBeansOfType(MetaDataService.class).values()) { + LOGGER.info( + "method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db starting", + event.getClusterPhyId(), metaDataService.getClass().getSimpleName() + ); + + try { + // 删除数据 + metaDataService.deleteInDBByKafkaClusterId(event.getClusterPhyId()); + + LOGGER.info( + "method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db finished", + event.getClusterPhyId(), metaDataService.getClass().getSimpleName() + ); + } catch (Exception e) { + LOGGER.error( + "method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db failed||errMsg=exception", + event.getClusterPhyId(), metaDataService.getClass().getSimpleName(), e + ); + } + } + } + ); + + + } +}