diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java new file mode 100644 index 00000000..29e19bc6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/event/cluster/connect/ClusterPhyDeletedEvent.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect; + +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyBaseEvent; +import lombok.Getter; + +/** + * 集群删除事件 + * @author zengqiao + * @date 23/08/15 + */ +@Getter +public class ClusterPhyDeletedEvent extends ClusterPhyBaseEvent { + public ClusterPhyDeletedEvent(Object source, Long clusterPhyId) { + super(source, clusterPhyId); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 43a0557c..6f9c5cfa 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent; import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO; import com.xiaojukeji.know.streaming.km.common.component.SpringTool; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; @@ -146,6 +147,9 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { String.format("删除集群:%s",clusterPhy.toString())); opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + // 发布删除集群事件 + SpringTool.publish(new ClusterPhyDeletedEvent(this, clusterPhyId)); + return Result.buildSuc(); } catch (Exception e) { log.error("method=removeClusterPhyById||clusterPhyId={}||operator={}||msg=remove cluster failed||errMsg=exception!", diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java index e6ad3929..621a5643 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/ConnectClusterService.java @@ -4,14 +4,16 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.cluster; import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata; +import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; import java.util.List; /** * Connect-Cluster */ -public interface ConnectClusterService { +public interface ConnectClusterService extends MetaDataService { Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata); List listByKafkaCluster(Long kafkaClusterPhyId); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java index 86879662..c0908b33 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/cluster/impl/ConnectClusterServiceImpl.java @@ -38,6 +38,14 @@ public class ConnectClusterServiceImpl implements ConnectClusterService { @Autowired private OpLogWrapService opLogWrapService; + @Override + public int deleteInDBByKafkaClusterId(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId); + + return connectClusterDAO.deleteById(lambdaQueryWrapper); + } + @Override public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) { ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName()); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java new file mode 100644 index 00000000..b10d61be --- /dev/null +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/service/listener/TaskClusterDeletedListener.java @@ -0,0 +1,53 @@ +package com.xiaojukeji.know.streaming.km.task.service.listener; + +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent; +import com.xiaojukeji.know.streaming.km.common.component.SpringTool; +import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; +import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; +import org.springframework.context.ApplicationListener; +import org.springframework.stereotype.Service; + +@Service +public class TaskClusterDeletedListener implements ApplicationListener { + private static final ILog LOGGER = LogFactory.getLog(TaskClusterDeletedListener.class); + + @Override + public void onApplicationEvent(ClusterPhyDeletedEvent event) { + LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened delete cluster", event.getClusterPhyId()); + + // 交由KS自定义的线程池,异步执行任务 + FutureUtil.quickStartupFutureUtil.submitTask( + () -> { + // 延迟60秒,避免正在运行的任务,将数据写入DB中 + BackoffUtils.backoff(60000); + + for (MetaDataService metaDataService: SpringTool.getBeansOfType(MetaDataService.class).values()) { + LOGGER.info( + "method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db starting", + event.getClusterPhyId(), metaDataService.getClass().getSimpleName() + ); + + try { + // 删除数据 + metaDataService.deleteInDBByKafkaClusterId(event.getClusterPhyId()); + + LOGGER.info( + "method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db finished", + event.getClusterPhyId(), metaDataService.getClass().getSimpleName() + ); + } catch (Exception e) { + LOGGER.error( + "method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db failed||errMsg=exception", + event.getClusterPhyId(), metaDataService.getClass().getSimpleName(), e + ); + } + } + } + ); + + + } +}