mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Bugfix]修复删除Kafka集群后,Connect集群任务出现NPE问题 (#1129)
原因: 首先,删除Kafka集群后,没有将DB中的Connect集群进行删除。随后,进行Connect集群指标采集时,由于所在的Kafka集群已经不存在了。最终,导致NPE; 解决: 发布一个Kafka集群删除事件,触发MetaDataService子类,将其在DB中的数据进行删除。 遗留: 当前MetaDataService仅在部分元信息同步类中实现,导致当前DB中的脏数据清理不彻底,后续等MetaDataService在所有元信息同步类中实现后,便可彻底清理数据。 PS:当前修复已保证NPE问题不会再出现。
This commit is contained in:
@@ -0,0 +1,16 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect;
|
||||||
|
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyBaseEvent;
|
||||||
|
import lombok.Getter;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群删除事件
|
||||||
|
* @author zengqiao
|
||||||
|
* @date 23/08/15
|
||||||
|
*/
|
||||||
|
@Getter
|
||||||
|
public class ClusterPhyDeletedEvent extends ClusterPhyBaseEvent {
|
||||||
|
public ClusterPhyDeletedEvent(Object source, Long clusterPhyId) {
|
||||||
|
super(source, clusterPhyId);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
|
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.ClusterPhyAddedEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
|
import com.xiaojukeji.know.streaming.km.common.bean.po.cluster.ClusterPhyPO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||||
@@ -146,6 +147,9 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
|
|||||||
String.format("删除集群:%s",clusterPhy.toString()));
|
String.format("删除集群:%s",clusterPhy.toString()));
|
||||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||||
|
|
||||||
|
// 发布删除集群事件
|
||||||
|
SpringTool.publish(new ClusterPhyDeletedEvent(this, clusterPhyId));
|
||||||
|
|
||||||
return Result.buildSuc();
|
return Result.buildSuc();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
log.error("method=removeClusterPhyById||clusterPhyId={}||operator={}||msg=remove cluster failed||errMsg=exception!",
|
log.error("method=removeClusterPhyById||clusterPhyId={}||operator={}||msg=remove cluster failed||errMsg=exception!",
|
||||||
|
|||||||
@@ -4,14 +4,16 @@ package com.xiaojukeji.know.streaming.km.core.service.connect.cluster;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO;
|
import com.xiaojukeji.know.streaming.km.common.bean.dto.connect.cluster.ConnectClusterDTO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectClusterMetadata;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSGroupDescription;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect-Cluster
|
* Connect-Cluster
|
||||||
*/
|
*/
|
||||||
public interface ConnectClusterService {
|
public interface ConnectClusterService extends MetaDataService<KSGroupDescription> {
|
||||||
Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata);
|
Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata);
|
||||||
|
|
||||||
List<ConnectCluster> listByKafkaCluster(Long kafkaClusterPhyId);
|
List<ConnectCluster> listByKafkaCluster(Long kafkaClusterPhyId);
|
||||||
|
|||||||
@@ -38,6 +38,14 @@ public class ConnectClusterServiceImpl implements ConnectClusterService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private OpLogWrapService opLogWrapService;
|
private OpLogWrapService opLogWrapService;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int deleteInDBByKafkaClusterId(Long clusterPhyId) {
|
||||||
|
LambdaQueryWrapper<ConnectClusterPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||||
|
lambdaQueryWrapper.eq(ConnectClusterPO::getKafkaClusterPhyId, clusterPhyId);
|
||||||
|
|
||||||
|
return connectClusterDAO.deleteById(lambdaQueryWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
|
public Long replaceAndReturnIdInDB(ConnectClusterMetadata metadata) {
|
||||||
ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName());
|
ConnectClusterPO oldPO = this.getPOFromDB(metadata.getKafkaClusterPhyId(), metadata.getGroupName());
|
||||||
|
|||||||
@@ -0,0 +1,53 @@
|
|||||||
|
package com.xiaojukeji.know.streaming.km.task.service.listener;
|
||||||
|
|
||||||
|
import com.didiglobal.logi.log.ILog;
|
||||||
|
import com.didiglobal.logi.log.LogFactory;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.event.cluster.connect.ClusterPhyDeletedEvent;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.component.SpringTool;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService;
|
||||||
|
import org.springframework.context.ApplicationListener;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
@Service
|
||||||
|
public class TaskClusterDeletedListener implements ApplicationListener<ClusterPhyDeletedEvent> {
|
||||||
|
private static final ILog LOGGER = LogFactory.getLog(TaskClusterDeletedListener.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onApplicationEvent(ClusterPhyDeletedEvent event) {
|
||||||
|
LOGGER.info("method=onApplicationEvent||clusterPhyId={}||msg=listened delete cluster", event.getClusterPhyId());
|
||||||
|
|
||||||
|
// 交由KS自定义的线程池,异步执行任务
|
||||||
|
FutureUtil.quickStartupFutureUtil.submitTask(
|
||||||
|
() -> {
|
||||||
|
// 延迟60秒,避免正在运行的任务,将数据写入DB中
|
||||||
|
BackoffUtils.backoff(60000);
|
||||||
|
|
||||||
|
for (MetaDataService metaDataService: SpringTool.getBeansOfType(MetaDataService.class).values()) {
|
||||||
|
LOGGER.info(
|
||||||
|
"method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db starting",
|
||||||
|
event.getClusterPhyId(), metaDataService.getClass().getSimpleName()
|
||||||
|
);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// 删除数据
|
||||||
|
metaDataService.deleteInDBByKafkaClusterId(event.getClusterPhyId());
|
||||||
|
|
||||||
|
LOGGER.info(
|
||||||
|
"method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db finished",
|
||||||
|
event.getClusterPhyId(), metaDataService.getClass().getSimpleName()
|
||||||
|
);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOGGER.error(
|
||||||
|
"method=onApplicationEvent||clusterPhyId={}||className={}||msg=delete cluster data in db failed||errMsg=exception",
|
||||||
|
event.getClusterPhyId(), metaDataService.getClass().getSimpleName(), e
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user