mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
[Optimize]优化Topic元信息更新策略(#806)
This commit is contained in:
@@ -22,6 +22,7 @@ public interface TopicService {
|
||||
* 从DB获取数据
|
||||
*/
|
||||
List<Topic> listTopicsFromDB(Long clusterPhyId);
|
||||
List<TopicPO> listTopicPOsFromDB(Long clusterPhyId);
|
||||
Topic getTopic(Long clusterPhyId, String topicName);
|
||||
List<String> listRecentUpdateTopicNamesFromDB(Long clusterPhyId, Integer time); // 获取集群最近新增Topic的topic名称:time单位为秒
|
||||
|
||||
@@ -39,6 +40,6 @@ public interface TopicService {
|
||||
int addNewTopic2DB(TopicPO po);
|
||||
int deleteTopicInDB(Long clusterPhyId, String topicName);
|
||||
void batchReplaceMetadata(Long clusterPhyId, List<Topic> presentTopicList);
|
||||
int batchReplaceConfig(Long clusterPhyId, List<TopicConfig> topicConfigList);
|
||||
int batchReplaceChangedConfig(Long clusterPhyId, List<TopicConfig> topicConfigList);
|
||||
Result<Void> updatePartitionNum(Long clusterPhyId, String topicName, Integer partitionNum);
|
||||
}
|
||||
|
||||
@@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTop
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.kafka.*;
|
||||
@@ -185,11 +184,9 @@ public class TopicConfigServiceImpl extends BaseVersionControlService implements
|
||||
|
||||
private Result<Properties> getTopicConfigByZKClient(Long clusterPhyId, String topicName) {
|
||||
try {
|
||||
Topic topic = topicService.getTopic(clusterPhyId, topicName);
|
||||
|
||||
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(clusterPhyId);
|
||||
|
||||
Properties properties = kafkaZkClient.getEntityConfigs("topics", topic.getTopicName());
|
||||
Properties properties = kafkaZkClient.getEntityConfigs("topics", topicName);
|
||||
for (Object key: properties.keySet()) {
|
||||
properties.getProperty((String) key);
|
||||
}
|
||||
@@ -209,12 +206,10 @@ public class TopicConfigServiceImpl extends BaseVersionControlService implements
|
||||
try {
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
|
||||
|
||||
Topic metadata = topicService.getTopic(param.getClusterPhyId(), param.getTopicName());
|
||||
|
||||
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, metadata.getTopicName());
|
||||
ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, param.getTopicName());
|
||||
DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(
|
||||
Arrays.asList(configResource),
|
||||
buildDescribeConfigsOptions()
|
||||
Collections.singletonList(configResource),
|
||||
buildDescribeConfigsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
|
||||
);
|
||||
|
||||
Map<ConfigResource, Config> configMap = describeConfigsResult.all().get();
|
||||
|
||||
@@ -101,7 +101,15 @@ public class TopicServiceImpl implements TopicService {
|
||||
|
||||
@Override
|
||||
public List<Topic> listTopicsFromDB(Long clusterPhyId) {
|
||||
return TopicConverter.convert2TopicList(this.getTopicsFromDB(clusterPhyId));
|
||||
return TopicConverter.convert2TopicList(this.listTopicPOsFromDB(clusterPhyId));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TopicPO> listTopicPOsFromDB(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<TopicPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(TopicPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
return topicDAO.selectList(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -182,39 +190,46 @@ public class TopicServiceImpl implements TopicService {
|
||||
|
||||
@Override
|
||||
public void batchReplaceMetadata(Long clusterPhyId, List<Topic> presentTopicList) {
|
||||
Map<String, Topic> presentTopicMap = presentTopicList.stream().collect(Collectors.toMap(Topic::getTopicName, Function.identity()));
|
||||
|
||||
List<TopicPO> dbTopicPOList = this.getTopicsFromDB(clusterPhyId);
|
||||
Map<String, TopicPO> inDBMap = this.listTopicPOsFromDB(clusterPhyId).stream().collect(Collectors.toMap(TopicPO::getTopicName, Function.identity()));
|
||||
|
||||
// 新旧合并
|
||||
for (TopicPO dbTopicPO: dbTopicPOList) {
|
||||
Topic topic = presentTopicMap.remove(dbTopicPO.getTopicName());
|
||||
if (topic == null) {
|
||||
topicDAO.deleteById(dbTopicPO.getId());
|
||||
continue;
|
||||
}
|
||||
|
||||
topicDAO.updateById(TopicConverter.mergeAndOnlyMetadata2NewTopicPO(topic, dbTopicPO));
|
||||
}
|
||||
|
||||
// DB中没有的则插入DB
|
||||
for (Topic topic: presentTopicMap.values()) {
|
||||
for (Topic presentTopic: presentTopicList) {
|
||||
try {
|
||||
topicDAO.insert(TopicConverter.mergeAndOnlyMetadata2NewTopicPO(topic, null));
|
||||
TopicPO inDBTopicPO = inDBMap.remove(presentTopic.getTopicName());
|
||||
|
||||
TopicPO newTopicPO = TopicConverter.mergeAndOnlyMetadata2NewTopicPO(presentTopic, inDBTopicPO);
|
||||
if (inDBTopicPO == null) {
|
||||
topicDAO.insert(newTopicPO);
|
||||
} else if (!newTopicPO.equals(inDBTopicPO)) {
|
||||
// 有变化时,则进行更新
|
||||
if (presentTopic.getUpdateTime() == null) {
|
||||
// 如果原数据的更新时间为null,则修改为当前时间
|
||||
newTopicPO.setUpdateTime(new Date());
|
||||
}
|
||||
topicDAO.updateById(newTopicPO);
|
||||
}
|
||||
|
||||
// 无变化时,直接忽略更新
|
||||
} catch (DuplicateKeyException dke) {
|
||||
// 忽略key冲突错误,多台KM可能同时做insert,所以可能出现key冲突
|
||||
}
|
||||
}
|
||||
|
||||
// DB中没有的则进行删除
|
||||
inDBMap.values().forEach(elem -> topicDAO.deleteById(elem.getId()));
|
||||
}
|
||||
|
||||
@Override
|
||||
public int batchReplaceConfig(Long clusterPhyId, List<TopicConfig> topicConfigList) {
|
||||
public int batchReplaceChangedConfig(Long clusterPhyId, List<TopicConfig> changedConfigList) {
|
||||
int effectRow = 0;
|
||||
for (TopicConfig config: topicConfigList) {
|
||||
for (TopicConfig config: changedConfigList) {
|
||||
try {
|
||||
effectRow += topicDAO.updateConfig(ConvertUtil.obj2Obj(config, TopicPO.class));
|
||||
effectRow += topicDAO.updateConfigById(ConvertUtil.obj2Obj(config, TopicPO.class));
|
||||
} catch (Exception e) {
|
||||
log.error("method=batchReplaceConfig||config={}||errMsg=exception!", config, e);
|
||||
log.error(
|
||||
"method=batchReplaceConfig||clusterPhyId={}||topicName={}||retentionMs={}||errMsg=exception!",
|
||||
config.getClusterPhyId(), config.getTopicName(), config.getRetentionMs(), e
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -299,11 +314,4 @@ public class TopicServiceImpl implements TopicService {
|
||||
|
||||
return topicDAO.selectOne(lambdaQueryWrapper);
|
||||
}
|
||||
|
||||
private List<TopicPO> getTopicsFromDB(Long clusterPhyId) {
|
||||
LambdaQueryWrapper<TopicPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(TopicPO::getClusterPhyId, clusterPhyId);
|
||||
|
||||
return topicDAO.selectList(lambdaQueryWrapper);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user