diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java index 273b62c6..9ab963aa 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java @@ -13,4 +13,12 @@ public interface TopicExpiredService { List getExpiredTopicDataList(String username); ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays); + + /** + * 通过topictopic名称删除 + * @param clusterId 集群id + * @param topicName topic名称 + * @return int + */ + int deleteByTopicName(Long clusterId, String topicName); } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java index 8a0028c7..594f1aa1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java @@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService { @Autowired private TopicManagerService topicManagerService; + @Autowired + private TopicExpiredService topicExpiredService; + @Autowired private TopicService topicService; @@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService { // 3. 数据库中删除topic topicManagerService.deleteByTopicName(clusterDO.getId(), topicName); + topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName); // 4. 数据库中删除authority authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index ea9d22da..153576c4 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService { ZooKeeper zk = null; try { - zk = new ZooKeeper(zookeeper, 1000, null); + zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name())); for (int i = 0; i < 15; ++i) { if (zk.getState().isConnected()) { // 只有状态是connected的时候,才表示地址是合法的 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java index c51e1dcb..d310af1a 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java @@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService { } return ResultStatus.MYSQL_ERROR; } + + @Override + public int deleteByTopicName(Long clusterId, String topicName) { + try { + return topicExpiredDao.deleteByName(clusterId, topicName); + } catch (Exception e) { + LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e); + } + return 0; + } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java index 18698941..ea189eb4 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java @@ -17,4 +17,6 @@ public interface TopicExpiredDao { int replace(TopicExpiredDO expiredDO); TopicExpiredDO getByTopic(Long clusterId, String topicName); + + int deleteByName(Long clusterId, String topicName); } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java index 51853db7..936d4931 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java @@ -50,4 +50,12 @@ public class TopicExpiredDaoImpl implements TopicExpiredDao { params.put("topicName", topicName); return sqlSession.selectOne("TopicExpiredDao.getByTopic", params); } + + @Override + public int deleteByName(Long clusterId, String topicName) { + Map params = new HashMap<>(2); + params.put("clusterId", clusterId); + params.put("topicName", topicName); + return sqlSession.delete("TopicExpiredDao.deleteByName", params); + } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml index 39ebf8ca..1da6753a 100644 --- a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml @@ -36,4 +36,8 @@ + + + DELETE FROM topic_expired WHERE cluster_id=#{clusterId} AND topic_name=#{topicName} +