mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
连接信息写DB优化为批量写入
This commit is contained in:
@@ -26,21 +26,27 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private TopicConnectionDao topicConnectionDao;
|
private TopicConnectionDao topicConnectionDao;
|
||||||
|
|
||||||
|
private int splitInterval = 50;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void batchAdd(List<TopicConnectionDO> doList) {
|
public void batchAdd(List<TopicConnectionDO> doList) {
|
||||||
if (ValidateUtils.isEmptyList(doList)) {
|
if (ValidateUtils.isEmptyList(doList)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
int allSize = doList.size();
|
||||||
|
int successSize = 0;
|
||||||
|
|
||||||
int count = 0;
|
int part = doList.size() / splitInterval;
|
||||||
for (TopicConnectionDO connectionDO: doList) {
|
for (int i = 0; i < part; ++i) {
|
||||||
try {
|
List<TopicConnectionDO> subList = doList.subList(0, splitInterval);
|
||||||
count += topicConnectionDao.replace(connectionDO);
|
successSize += topicConnectionDao.batchReplace(subList);
|
||||||
} catch (Exception e) {
|
doList.subList(0, splitInterval).clear();
|
||||||
LOGGER.error("class=TopicConnectionServiceImpl||method=batchAdd||connectionDO={}||errMsg={}", connectionDO, e.getMessage());
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", doList.size(), count);
|
if (!ValidateUtils.isEmptyList(doList)) {
|
||||||
|
successSize += topicConnectionDao.batchReplace(doList);
|
||||||
|
}
|
||||||
|
LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", allSize, successSize);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|||||||
@@ -13,8 +13,6 @@ import java.util.List;
|
|||||||
public interface TopicConnectionDao {
|
public interface TopicConnectionDao {
|
||||||
int batchReplace(List<TopicConnectionDO> doList);
|
int batchReplace(List<TopicConnectionDO> doList);
|
||||||
|
|
||||||
int replace(TopicConnectionDO topicConnectionDO);
|
|
||||||
|
|
||||||
List<TopicConnectionDO> getByTopicName(Long clusterId, String topicName, Date startTime, Date endTime);
|
List<TopicConnectionDO> getByTopicName(Long clusterId, String topicName, Date startTime, Date endTime);
|
||||||
|
|
||||||
List<TopicConnectionDO> getByAppId(String appId, Date startTime, Date endTime);
|
List<TopicConnectionDO> getByAppId(String appId, Date startTime, Date endTime);
|
||||||
|
|||||||
@@ -27,29 +27,12 @@ public class TopicConnectionDaoImpl implements TopicConnectionDao {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int batchReplace(List<TopicConnectionDO> doList) {
|
public int batchReplace(List<TopicConnectionDO> doList) {
|
||||||
int count = 0;
|
|
||||||
for (TopicConnectionDO elem: doList) {
|
|
||||||
try {
|
|
||||||
count += sqlSession.insert("TopicConnectionDao.replace", elem);
|
|
||||||
} catch (DeadlockLoserDataAccessException e1) {
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOGGER.error("add topic connection info, clusterId:{} topicName:{}."
|
|
||||||
, elem.getClusterId(), elem.getTopicName(), e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int replace(TopicConnectionDO topicConnectionDO) {
|
|
||||||
try {
|
try {
|
||||||
return sqlSession.insert("TopicConnectionDao.replace", topicConnectionDO);
|
return sqlSession.insert("TopicConnectionDao.batchReplace", doList);
|
||||||
} catch (DeadlockLoserDataAccessException e1) {
|
} catch (DeadlockLoserDataAccessException e1) {
|
||||||
return 0;
|
return 0;
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOGGER.error("add topic connection info, clusterId:{} topicName:{}."
|
LOGGER.error("add topic connections info failed", e);
|
||||||
, topicConnectionDO.getClusterId(), topicConnectionDO.getTopicName(), e);
|
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -13,7 +13,7 @@
|
|||||||
<result property="createTime" column="create_time"/>
|
<result property="createTime" column="create_time"/>
|
||||||
</resultMap>
|
</resultMap>
|
||||||
|
|
||||||
<insert id="replace" parameterType="com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO">
|
<insert id="batchReplace" parameterType="java.util.List">
|
||||||
REPLACE INTO topic_connections (
|
REPLACE INTO topic_connections (
|
||||||
cluster_id,
|
cluster_id,
|
||||||
topic_name,
|
topic_name,
|
||||||
@@ -22,16 +22,19 @@
|
|||||||
ip,
|
ip,
|
||||||
client_version,
|
client_version,
|
||||||
create_time
|
create_time
|
||||||
)
|
)
|
||||||
VALUES (
|
VALUES
|
||||||
#{clusterId},
|
<foreach collection="list" item="item" index="index" separator=",">
|
||||||
#{topicName},
|
(
|
||||||
#{type},
|
#{item.clusterId},
|
||||||
#{appId},
|
#{item.topicName},
|
||||||
#{ip},
|
#{item.type},
|
||||||
#{clientVersion},
|
#{item.appId},
|
||||||
#{createTime}
|
#{item.ip},
|
||||||
)
|
#{item.clientVersion},
|
||||||
|
#{item.createTime}
|
||||||
|
)
|
||||||
|
</foreach>
|
||||||
</insert>
|
</insert>
|
||||||
|
|
||||||
<select id="getByTopicName" parameterType="java.util.Map" resultMap="TopicConnectionMap">
|
<select id="getByTopicName" parameterType="java.util.Map" resultMap="TopicConnectionMap">
|
||||||
|
|||||||
Reference in New Issue
Block a user