mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
当使用 PostgreSQL 数据库时,使用 insert on conflict 替代 MySQL 的 replace SQL语句。
This commit is contained in:
@@ -19,12 +19,18 @@ public class BrokerDaoImpl implements BrokerDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int replace(BrokerDO brokerInfoDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("BrokerDao.replaceOnPG", brokerInfoDO);
|
||||
}
|
||||
return sqlSession.insert("BrokerDao.replace", brokerInfoDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
package com.xiaojukeji.kafka.manager.dao.impl;
|
||||
|
||||
import org.springframework.boot.context.properties.ConfigurationProperties;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Component
|
||||
@ConfigurationProperties("spring.datasource.kafka-manager")
|
||||
public class KafkaManagerProperties {
|
||||
private String jdbcUrl;
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return jdbcUrl;
|
||||
}
|
||||
|
||||
public void setJdbcUrl(String jdbcUrl) {
|
||||
this.jdbcUrl = jdbcUrl;
|
||||
}
|
||||
|
||||
public boolean hasPG() {
|
||||
return jdbcUrl.startsWith("jdbc:postgres");
|
||||
}
|
||||
}
|
||||
@@ -17,12 +17,18 @@ public class RegionDaoImpl implements RegionDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int insert(RegionDO regionDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("RegionDao.insertOnPG", regionDO);
|
||||
}
|
||||
return sqlSession.insert("RegionDao.insert", regionDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -19,12 +19,18 @@ public class TopicDaoImpl implements TopicDao {
|
||||
@Autowired
|
||||
private SqlSessionTemplate sqlSession;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int replace(TopicDO topicDO) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("TopicDao.replaceOnPG", topicDO);
|
||||
}
|
||||
return sqlSession.insert("TopicDao.replace", topicDO);
|
||||
}
|
||||
|
||||
|
||||
@@ -25,12 +25,18 @@ public class TopicFavoriteDaoImpl implements TopicFavoriteDao {
|
||||
@Autowired
|
||||
private TransactionTemplate transactionTemplate;
|
||||
|
||||
@Autowired
|
||||
private KafkaManagerProperties kafkaManagerProperties;
|
||||
|
||||
public void setSqlSession(SqlSessionTemplate sqlSession) {
|
||||
this.sqlSession = sqlSession;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int batchAdd(List<TopicFavoriteDO> topicFavoriteDOList) {
|
||||
if (kafkaManagerProperties.hasPG()) {
|
||||
return sqlSession.insert("TopicFavoriteDao.batchAddOnPG", topicFavoriteDOList);
|
||||
}
|
||||
return sqlSession.insert("TopicFavoriteDao.batchAdd", topicFavoriteDOList);
|
||||
}
|
||||
|
||||
|
||||
@@ -20,6 +20,16 @@
|
||||
(#{clusterId}, #{brokerId}, #{host}, #{port}, #{timestamp}, #{status})
|
||||
</insert>
|
||||
|
||||
<insert id="replaceOnPG" parameterType="BrokerDO">
|
||||
insert into broker
|
||||
(cluster_id, broker_id, host, port, timestamp, status)
|
||||
values
|
||||
on conflict (cluster_id, broker_id) do update set host = excluded.host,
|
||||
port = excluded.port,
|
||||
timestamp = excluded.timestamp,
|
||||
status = excluded.status
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.util.Map">
|
||||
DELETE FROM broker WHERE cluster_id = #{clusterId} AND broker_id = #{brokerId}
|
||||
</delete>
|
||||
|
||||
@@ -21,6 +21,15 @@
|
||||
(#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
||||
</insert>
|
||||
|
||||
<insert id="insertOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.RegionDO">
|
||||
insert into region
|
||||
(region_name, cluster_id, broker_list, description, operator)
|
||||
values (#{regionName}, #{clusterId}, #{brokerList}, #{description}, #{operator})
|
||||
on conflict (region_name, cluster_id) do update set broker_list = excluded.broker_list,
|
||||
description = excluded.description,
|
||||
operator = excluded.operator
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.lang.Long">
|
||||
DELETE FROM region WHERE id = #{id}
|
||||
</delete>
|
||||
|
||||
@@ -19,6 +19,15 @@
|
||||
(#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
||||
</insert>
|
||||
|
||||
<insert id="replaceOnPG" parameterType="com.xiaojukeji.kafka.manager.common.entity.po.TopicDO">
|
||||
insert into topic
|
||||
(cluster_id, topic_name, principals, description, status)
|
||||
values (#{clusterId}, #{topicName}, #{principals}, #{description}, #{status})
|
||||
on conflict (cluster_id, topic_name) do update set principals = excluded.principals,
|
||||
description = excluded.description,
|
||||
status = excluded.status
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.lang.Long">
|
||||
DELETE FROM topic WHERE id = #{id}
|
||||
</delete>
|
||||
|
||||
@@ -21,6 +21,15 @@
|
||||
</foreach>
|
||||
</insert>
|
||||
|
||||
<insert id="batchAddOnPG" parameterType="java.util.List">
|
||||
insert into topic_favorite (cluster_id, topic_name, username)
|
||||
values
|
||||
<foreach item="TopicFavoriteDO" index="index" collection="list" separator=",">
|
||||
(#{TopicFavoriteDO.clusterId}, #{TopicFavoriteDO.topicName}, #{TopicFavoriteDO.username})
|
||||
</foreach>
|
||||
on conflict (cluster_id, topic_name, username) do update set gmt_modify = now();
|
||||
</insert>
|
||||
|
||||
<delete id="deleteById" parameterType="java.lang.Long">
|
||||
DELETE FROM topic_favorite WHERE id=#{id}
|
||||
</delete>
|
||||
|
||||
@@ -76,6 +76,7 @@ public class AdminTopicServiceImpl implements AdminTopicService {
|
||||
OperationHistoryDO operationHistoryDO = OperationHistoryDO.newInstance(topicDO.getClusterId(), topicDO.getTopicName(), operator, OperationEnum.CREATE_TOPIC.message);
|
||||
operationHistoryDao.insert(operationHistoryDO);
|
||||
topicDao.replace(topicDO);
|
||||
|
||||
} catch (Exception e) {
|
||||
return AdminTopicStatusEnum.REPLACE_DB_FAILED;
|
||||
}
|
||||
@@ -188,4 +189,5 @@ public class AdminTopicServiceImpl implements AdminTopicService {
|
||||
}
|
||||
return AdminTopicStatusEnum.SUCCESS;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -18,9 +18,6 @@ spring:
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
profiles:
|
||||
active: dev
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
|
||||
@@ -22,9 +22,6 @@ spring:
|
||||
main:
|
||||
allow-bean-definition-overriding: true
|
||||
|
||||
profiles:
|
||||
active: dev
|
||||
|
||||
logging:
|
||||
config: classpath:logback-spring.xml
|
||||
|
||||
|
||||
Reference in New Issue
Block a user