From cbf17d4eb5c18dfa3a2ed65e6d98f5ad5c6ef977 Mon Sep 17 00:00:00 2001 From: xuguang Date: Fri, 19 Nov 2021 19:27:19 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D"=E6=96=B0=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E9=9B=86=E7=BE=A4=E7=9A=84=E6=97=B6=E5=80=99=EF=BC=8C=E6=8A=A5?= =?UTF-8?q?watch=E7=9A=84=E7=A9=BA=E6=8C=87=E9=92=88=E5=BC=82=E5=B8=B8"?= =?UTF-8?q?=E9=97=AE=E9=A2=98=20&=20=E4=BF=AE=E5=A4=8D"=E5=88=A0=E9=99=A4?= =?UTF-8?q?=E5=BA=9F=E5=BC=83Topic=E4=B9=8B=E5=90=8E=EF=BC=8CTopic?= =?UTF-8?q?=E8=B5=84=E6=BA=90=E6=B2=BB=E7=90=86=E6=B2=A1=E6=9C=89=E5=90=8C?= =?UTF-8?q?=E6=AD=A5=E5=88=A0=E9=99=A4"=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../manager/service/service/TopicExpiredService.java | 8 ++++++++ .../manager/service/service/impl/AdminServiceImpl.java | 4 ++++ .../service/service/impl/ClusterServiceImpl.java | 4 +++- .../service/service/impl/TopicExpiredServiceImpl.java | 10 ++++++++++ .../xiaojukeji/kafka/manager/dao/TopicExpiredDao.java | 2 ++ .../kafka/manager/dao/impl/TopicExpiredDaoImpl.java | 8 ++++++++ .../src/main/resources/mapper/TopicExpiredDao.xml | 4 ++++ 7 files changed, 39 insertions(+), 1 deletion(-) diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java index 273b62c6..9ab963aa 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredService.java @@ -13,4 +13,12 @@ public interface TopicExpiredService { List getExpiredTopicDataList(String username); ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays); + + /** + * 通过topictopic名称删除 + * @param clusterId 集群id + * @param topicName topic名称 + * @return int + */ + int deleteByTopicName(Long clusterId, String topicName); } \ No newline at end of file diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java index 8a0028c7..594f1aa1 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/AdminServiceImpl.java @@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService { @Autowired private TopicManagerService topicManagerService; + @Autowired + private TopicExpiredService topicExpiredService; + @Autowired private TopicService topicService; @@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService { // 3. 数据库中删除topic topicManagerService.deleteByTopicName(clusterDO.getId(), topicName); + topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName); // 4. 数据库中删除authority authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName); diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index ea9d22da..153576c4 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService { ZooKeeper zk = null; try { - zk = new ZooKeeper(zookeeper, 1000, null); + zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name())); for (int i = 0; i < 15; ++i) { if (zk.getState().isConnected()) { // 只有状态是connected的时候,才表示地址是合法的 diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java index c51e1dcb..d310af1a 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/TopicExpiredServiceImpl.java @@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService { } return ResultStatus.MYSQL_ERROR; } + + @Override + public int deleteByTopicName(Long clusterId, String topicName) { + try { + return topicExpiredDao.deleteByName(clusterId, topicName); + } catch (Exception e) { + LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e); + } + return 0; + } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java index 18698941..ea189eb4 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/TopicExpiredDao.java @@ -17,4 +17,6 @@ public interface TopicExpiredDao { int replace(TopicExpiredDO expiredDO); TopicExpiredDO getByTopic(Long clusterId, String topicName); + + int deleteByName(Long clusterId, String topicName); } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java index 51853db7..936d4931 100644 --- a/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java +++ b/kafka-manager-dao/src/main/java/com/xiaojukeji/kafka/manager/dao/impl/TopicExpiredDaoImpl.java @@ -50,4 +50,12 @@ public class TopicExpiredDaoImpl implements TopicExpiredDao { params.put("topicName", topicName); return sqlSession.selectOne("TopicExpiredDao.getByTopic", params); } + + @Override + public int deleteByName(Long clusterId, String topicName) { + Map params = new HashMap<>(2); + params.put("clusterId", clusterId); + params.put("topicName", topicName); + return sqlSession.delete("TopicExpiredDao.deleteByName", params); + } } \ No newline at end of file diff --git a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml index 39ebf8ca..1da6753a 100644 --- a/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml +++ b/kafka-manager-dao/src/main/resources/mapper/TopicExpiredDao.xml @@ -36,4 +36,8 @@ + + + DELETE FROM topic_expired WHERE cluster_id=#{clusterId} AND topic_name=#{topicName} +