diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/topic/TopicConfig.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/topic/TopicConfig.java index 162f5bcf..5fde198a 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/topic/TopicConfig.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/topic/TopicConfig.java @@ -14,6 +14,11 @@ import java.io.Serializable; @NoArgsConstructor @AllArgsConstructor public class TopicConfig implements Serializable { + /** + * 表主键ID + */ + private Long id; + /** * 物理集群ID */ diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/topic/TopicPO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/topic/TopicPO.java index 09ccbc16..8e14a313 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/topic/TopicPO.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/po/topic/TopicPO.java @@ -5,6 +5,8 @@ import com.xiaojukeji.know.streaming.km.common.bean.po.BasePO; import com.xiaojukeji.know.streaming.km.common.constant.Constant; import lombok.Data; +import java.util.Objects; + @Data @TableName(Constant.MYSQL_TABLE_NAME_PREFIX + "topic") public class TopicPO extends BasePO { @@ -52,4 +54,35 @@ public class TopicPO extends BasePO { * 备注信息 */ private String description; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + if (!super.equals(o)) { + return false; + } + + TopicPO topicPO = (TopicPO) o; + return Objects.equals(clusterPhyId, topicPO.clusterPhyId) + && Objects.equals(topicName, topicPO.topicName) + && Objects.equals(replicaNum, topicPO.replicaNum) + && Objects.equals(partitionNum, topicPO.partitionNum) + && Objects.equals(brokerIds, topicPO.brokerIds) + && Objects.equals(partitionMap, topicPO.partitionMap) + && Objects.equals(retentionMs, topicPO.retentionMs) + && Objects.equals(type, topicPO.type) + && Objects.equals(description, topicPO.description); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), clusterPhyId, topicName, replicaNum, partitionNum, brokerIds, partitionMap, retentionMs, type, description); + } } \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicConverter.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicConverter.java index c935c7f4..9ad4ed46 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicConverter.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/converter/TopicConverter.java @@ -55,10 +55,6 @@ public class TopicConverter { * 仅合并Topic的元信息部分,业务信息和配置信息部分不合并 */ public static TopicPO mergeAndOnlyMetadata2NewTopicPO(Topic newTopicData, TopicPO oldDBTopicPO) { - if (newTopicData == null) { - return null; - } - TopicPO newTopicPO = new TopicPO(); newTopicPO.setId(oldDBTopicPO != null? oldDBTopicPO.getId(): null); @@ -68,6 +64,7 @@ public class TopicConverter { newTopicPO.setReplicaNum(newTopicData.getReplicaNum()); newTopicPO.setBrokerIds(CommonUtils.intList2String(new ArrayList<>(newTopicData.getBrokerIdSet()))); newTopicPO.setType(newTopicData.getType()); + newTopicPO.setPartitionMap(ConvertUtil.obj2Json(newTopicData.getPartitionMap())); if (newTopicData.getCreateTime() != null) { newTopicPO.setCreateTime(new Date(newTopicData.getCreateTime())); @@ -77,8 +74,8 @@ public class TopicConverter { newTopicPO.setUpdateTime(oldDBTopicPO != null? oldDBTopicPO.getUpdateTime(): new Date()); } - newTopicPO.setPartitionMap(ConvertUtil.obj2Json(newTopicData.getPartitionMap())); - + newTopicPO.setDescription(oldDBTopicPO != null? oldDBTopicPO.getDescription(): null); + newTopicPO.setRetentionMs(oldDBTopicPO != null? oldDBTopicPO.getRetentionMs(): null); return newTopicPO; } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicService.java index 14aa4a93..cd5d5ded 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/TopicService.java @@ -22,6 +22,7 @@ public interface TopicService { * 从DB获取数据 */ List listTopicsFromDB(Long clusterPhyId); + List listTopicPOsFromDB(Long clusterPhyId); Topic getTopic(Long clusterPhyId, String topicName); List listRecentUpdateTopicNamesFromDB(Long clusterPhyId, Integer time); // 获取集群最近新增Topic的topic名称:time单位为秒 @@ -39,6 +40,6 @@ public interface TopicService { int addNewTopic2DB(TopicPO po); int deleteTopicInDB(Long clusterPhyId, String topicName); void batchReplaceMetadata(Long clusterPhyId, List presentTopicList); - int batchReplaceConfig(Long clusterPhyId, List topicConfigList); + int batchReplaceChangedConfig(Long clusterPhyId, List topicConfigList); Result updatePartitionNum(Long clusterPhyId, String topicName, Integer partitionNum); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java index 2b089c67..0149b5d4 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicConfigServiceImpl.java @@ -10,7 +10,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTop import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; -import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.kafka.*; @@ -185,11 +184,9 @@ public class TopicConfigServiceImpl extends BaseVersionControlService implements private Result getTopicConfigByZKClient(Long clusterPhyId, String topicName) { try { - Topic topic = topicService.getTopic(clusterPhyId, topicName); - KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(clusterPhyId); - Properties properties = kafkaZkClient.getEntityConfigs("topics", topic.getTopicName()); + Properties properties = kafkaZkClient.getEntityConfigs("topics", topicName); for (Object key: properties.keySet()) { properties.getProperty((String) key); } @@ -209,12 +206,10 @@ public class TopicConfigServiceImpl extends BaseVersionControlService implements try { AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); - Topic metadata = topicService.getTopic(param.getClusterPhyId(), param.getTopicName()); - - ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, metadata.getTopicName()); + ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, param.getTopicName()); DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs( - Arrays.asList(configResource), - buildDescribeConfigsOptions() + Collections.singletonList(configResource), + buildDescribeConfigsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) ); Map configMap = describeConfigsResult.all().get(); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java index 2689373c..ac101c61 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/topic/impl/TopicServiceImpl.java @@ -101,7 +101,15 @@ public class TopicServiceImpl implements TopicService { @Override public List listTopicsFromDB(Long clusterPhyId) { - return TopicConverter.convert2TopicList(this.getTopicsFromDB(clusterPhyId)); + return TopicConverter.convert2TopicList(this.listTopicPOsFromDB(clusterPhyId)); + } + + @Override + public List listTopicPOsFromDB(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(TopicPO::getClusterPhyId, clusterPhyId); + + return topicDAO.selectList(lambdaQueryWrapper); } @Override @@ -182,39 +190,46 @@ public class TopicServiceImpl implements TopicService { @Override public void batchReplaceMetadata(Long clusterPhyId, List presentTopicList) { - Map presentTopicMap = presentTopicList.stream().collect(Collectors.toMap(Topic::getTopicName, Function.identity())); - - List dbTopicPOList = this.getTopicsFromDB(clusterPhyId); + Map inDBMap = this.listTopicPOsFromDB(clusterPhyId).stream().collect(Collectors.toMap(TopicPO::getTopicName, Function.identity())); // 新旧合并 - for (TopicPO dbTopicPO: dbTopicPOList) { - Topic topic = presentTopicMap.remove(dbTopicPO.getTopicName()); - if (topic == null) { - topicDAO.deleteById(dbTopicPO.getId()); - continue; - } - - topicDAO.updateById(TopicConverter.mergeAndOnlyMetadata2NewTopicPO(topic, dbTopicPO)); - } - - // DB中没有的则插入DB - for (Topic topic: presentTopicMap.values()) { + for (Topic presentTopic: presentTopicList) { try { - topicDAO.insert(TopicConverter.mergeAndOnlyMetadata2NewTopicPO(topic, null)); + TopicPO inDBTopicPO = inDBMap.remove(presentTopic.getTopicName()); + + TopicPO newTopicPO = TopicConverter.mergeAndOnlyMetadata2NewTopicPO(presentTopic, inDBTopicPO); + if (inDBTopicPO == null) { + topicDAO.insert(newTopicPO); + } else if (!newTopicPO.equals(inDBTopicPO)) { + // 有变化时,则进行更新 + if (presentTopic.getUpdateTime() == null) { + // 如果原数据的更新时间为null,则修改为当前时间 + newTopicPO.setUpdateTime(new Date()); + } + topicDAO.updateById(newTopicPO); + } + + // 无变化时,直接忽略更新 } catch (DuplicateKeyException dke) { // 忽略key冲突错误,多台KM可能同时做insert,所以可能出现key冲突 } } + + // DB中没有的则进行删除 + inDBMap.values().forEach(elem -> topicDAO.deleteById(elem.getId())); } @Override - public int batchReplaceConfig(Long clusterPhyId, List topicConfigList) { + public int batchReplaceChangedConfig(Long clusterPhyId, List changedConfigList) { int effectRow = 0; - for (TopicConfig config: topicConfigList) { + for (TopicConfig config: changedConfigList) { try { - effectRow += topicDAO.updateConfig(ConvertUtil.obj2Obj(config, TopicPO.class)); + effectRow += topicDAO.updateConfigById(ConvertUtil.obj2Obj(config, TopicPO.class)); } catch (Exception e) { - log.error("method=batchReplaceConfig||config={}||errMsg=exception!", config, e); + log.error( + "method=batchReplaceConfig||clusterPhyId={}||topicName={}||retentionMs={}||errMsg=exception!", + config.getClusterPhyId(), config.getTopicName(), config.getRetentionMs(), e + ); } } @@ -299,11 +314,4 @@ public class TopicServiceImpl implements TopicService { return topicDAO.selectOne(lambdaQueryWrapper); } - - private List getTopicsFromDB(Long clusterPhyId) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(TopicPO::getClusterPhyId, clusterPhyId); - - return topicDAO.selectList(lambdaQueryWrapper); - } } diff --git a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/topic/TopicDAO.java b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/topic/TopicDAO.java index 79437029..7a2dc15f 100644 --- a/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/topic/TopicDAO.java +++ b/km-persistence/src/main/java/com/xiaojukeji/know/streaming/km/persistence/mysql/topic/TopicDAO.java @@ -8,5 +8,5 @@ import org.springframework.stereotype.Repository; public interface TopicDAO extends BaseMapper { int replaceAll(TopicPO topicPO); - int updateConfig(TopicPO topicPO); + int updateConfigById(TopicPO topicPO); } diff --git a/km-persistence/src/main/resources/mybatis/TopicMapper.xml b/km-persistence/src/main/resources/mybatis/TopicMapper.xml index 66e5b629..d13ecd52 100644 --- a/km-persistence/src/main/resources/mybatis/TopicMapper.xml +++ b/km-persistence/src/main/resources/mybatis/TopicMapper.xml @@ -25,8 +25,8 @@ (#{clusterPhyId}, #{topicName}, #{replicaNum}, #{partitionNum}, #{brokerIds}, #{partitionMap}, #{retentionMs}, #{type}, #{description}) - - UPDATE ks_km_topic SET retention_ms = #{retentionMs} WHERE cluster_phy_id = #{clusterPhyId} AND topic_name = #{topicName} + + UPDATE ks_km_topic SET retention_ms = #{retentionMs} WHERE id=#{id} \ No newline at end of file diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncTopicConfigTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncTopicConfigTask.java index 094f288c..60b6fbe8 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncTopicConfigTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncTopicConfigTask.java @@ -7,8 +7,8 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; -import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.TopicConfig; +import com.xiaojukeji.know.streaming.km.common.bean.po.topic.TopicPO; import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; @@ -44,18 +44,25 @@ public class SyncTopicConfigTask extends AbstractAsyncMetadataDispatchTask { public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { boolean success = true; - List topicConfigList = new ArrayList<>(); - for (Topic topic: topicService.listTopicsFromDB(clusterPhy.getId())) { - Result configResult = this.getTopicConfig(clusterPhy.getId(), topic.getTopicName()); + List changedConfigList = new ArrayList<>(); + for (TopicPO topicPO: topicService.listTopicPOsFromDB(clusterPhy.getId())) { + Result configResult = this.getTopicConfig(clusterPhy.getId(), topicPO.getTopicName()); if (configResult.failed()) { success = false; continue; } - topicConfigList.add(configResult.getData()); + TopicConfig config = configResult.getData(); + if (topicPO.getRetentionMs().equals(config.getRetentionMs())) { + // 数据无变化,不需要加入待更新列表中 + continue; + } + + config.setId(topicPO.getId()); + changedConfigList.add(configResult.getData()); } - topicService.batchReplaceConfig(clusterPhy.getId(), topicConfigList); + topicService.batchReplaceChangedConfig(clusterPhy.getId(), changedConfigList); return success? TaskResult.SUCCESS: TaskResult.FAIL; }