修复"新添加集群的时候,报watch的空指针异常"问题 & 修复"删除废弃Topic之后,Topic资源治理没有同步删除"问题

This commit is contained in:
xuguang
2021-11-19 19:27:19 +08:00
parent 6b1e944bba
commit cbf17d4eb5
7 changed files with 39 additions and 1 deletions

View File

@@ -13,4 +13,12 @@ public interface TopicExpiredService {
List<TopicExpiredData> getExpiredTopicDataList(String username);
ResultStatus retainExpiredTopic(Long physicalClusterId, String topicName, Integer retainDays);
/**
* 通过topictopic名称删除
* @param clusterId 集群id
* @param topicName topic名称
* @return int
*/
int deleteByTopicName(Long clusterId, String topicName);
}

View File

@@ -43,6 +43,9 @@ public class AdminServiceImpl implements AdminService {
@Autowired
private TopicManagerService topicManagerService;
@Autowired
private TopicExpiredService topicExpiredService;
@Autowired
private TopicService topicService;
@@ -143,6 +146,7 @@ public class AdminServiceImpl implements AdminService {
// 3. 数据库中删除topic
topicManagerService.deleteByTopicName(clusterDO.getId(), topicName);
topicExpiredService.deleteByTopicName(clusterDO.getId(), topicName);
// 4. 数据库中删除authority
authorityService.deleteAuthorityByTopic(clusterDO.getId(), topicName);

View File

@@ -19,6 +19,8 @@ import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -210,7 +212,7 @@ public class ClusterServiceImpl implements ClusterService {
ZooKeeper zk = null;
try {
zk = new ZooKeeper(zookeeper, 1000, null);
zk = new ZooKeeper(zookeeper, 1000, watchedEvent -> LOGGER.info(" receive event : " + watchedEvent.getType().name()));
for (int i = 0; i < 15; ++i) {
if (zk.getState().isConnected()) {
// 只有状态是connected的时候才表示地址是合法的

View File

@@ -75,4 +75,14 @@ public class TopicExpiredServiceImpl implements TopicExpiredService {
}
return ResultStatus.MYSQL_ERROR;
}
@Override
public int deleteByTopicName(Long clusterId, String topicName) {
try {
return topicExpiredDao.deleteByName(clusterId, topicName);
} catch (Exception e) {
LOGGER.error("delete topic failed, clusterId:{} topicName:{}", clusterId, topicName, e);
}
return 0;
}
}

View File

@@ -17,4 +17,6 @@ public interface TopicExpiredDao {
int replace(TopicExpiredDO expiredDO);
TopicExpiredDO getByTopic(Long clusterId, String topicName);
int deleteByName(Long clusterId, String topicName);
}

View File

@@ -50,4 +50,12 @@ public class TopicExpiredDaoImpl implements TopicExpiredDao {
params.put("topicName", topicName);
return sqlSession.selectOne("TopicExpiredDao.getByTopic", params);
}
@Override
public int deleteByName(Long clusterId, String topicName) {
Map<String, Object> params = new HashMap<>(2);
params.put("clusterId", clusterId);
params.put("topicName", topicName);
return sqlSession.delete("TopicExpiredDao.deleteByName", params);
}
}

View File

@@ -36,4 +36,8 @@
<select id="getByTopic" parameterType="java.util.Map" resultMap="TopicExpiredMap">
SELECT * FROM topic_expired WHERE cluster_id = #{clusterId} AND topic_name = #{topicName}
</select>
<delete id="deleteByName" parameterType="java.util.Map">
DELETE FROM topic_expired WHERE cluster_id=#{clusterId} AND topic_name=#{topicName}
</delete>
</mapper>