From 251f7f711094649368d2139ab903e45b22e49ca2 Mon Sep 17 00:00:00 2001 From: "qiao.zeng" Date: Sun, 12 Nov 2023 15:06:10 +0800 Subject: [PATCH] =?UTF-8?q?[Bugfix]=E4=BF=AE=E5=A4=8DTruncate=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E4=B8=8D=E7=94=9F=E6=95=88=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../km/biz/topic/impl/OpTopicManagerImpl.java | 67 +++++++++++++++++-- 1 file changed, 63 insertions(+), 4 deletions(-) diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java index 22d204ea..42459447 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/topic/impl/OpTopicManagerImpl.java @@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicCreateDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicExpansionDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTopicConfigParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam; @@ -17,17 +18,17 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; -import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils; -import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil; -import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.common.utils.*; import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil; import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService; +import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import kafka.admin.AdminUtils; import kafka.admin.BrokerMetadata; +import org.apache.kafka.common.config.TopicConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; @@ -61,6 +62,9 @@ public class OpTopicManagerImpl implements OpTopicManager { @Autowired private PartitionService partitionService; + @Autowired + private TopicConfigService topicConfigService; + @Override public Result createTopic(TopicCreateDTO dto, String operator) { log.info("method=createTopic||param={}||operator={}.", dto, operator); @@ -160,10 +164,27 @@ public class OpTopicManagerImpl implements OpTopicManager { @Override public Result truncateTopic(Long clusterPhyId, String topicName, String operator) { + // 增加delete配置 + Result> rt = this.addDeleteConfigIfNotExist(clusterPhyId, topicName, operator); + if (rt.failed()) { + log.error("method=truncateTopic||clusterPhyId={}||topicName={}||operator={}||result={}||msg=get config from kafka failed", clusterPhyId, topicName, operator, rt); + return Result.buildFromIgnoreData(rt); + } + // 清空Topic Result rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator); if (rv.failed()) { - return rv; + log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv); + // config被修改了,则错误提示需要提醒一下,否则直接返回错误 + return rt.getData().v1() ? Result.buildFailure(rv.getCode(), rv.getMessage() + "\t\n" + String.format("Topic的CleanupPolicy已被修改,需要手动恢复为%s", rt.getData().v2())) : rv; + } + + // 恢复compact配置 + rv = this.recoverConfigIfChanged(clusterPhyId, topicName, rt.getData().v1(), rt.getData().v2(), operator); + if (rv.failed()) { + log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic success but recover config failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv); + // config被修改了,则错误提示需要提醒一下,否则直接返回错误 + return Result.buildFailure(rv.getCode(), String.format("Topic清空操作已成功,但是恢复CleanupPolicy配置失败,需要手动恢复为%s。", rt.getData().v2()) + "\t\n" + rv.getMessage()); } return Result.buildSuc(); @@ -171,6 +192,44 @@ public class OpTopicManagerImpl implements OpTopicManager { /**************************************************** private method ****************************************************/ + private Result> addDeleteConfigIfNotExist(Long clusterPhyId, String topicName, String operator) { + // 获取Topic配置 + Result> configMapResult = topicConfigService.getTopicConfigFromKafka(clusterPhyId, topicName); + if (configMapResult.failed()) { + return Result.buildFromIgnoreData(configMapResult); + } + + String cleanupPolicyValue = configMapResult.getData().getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, ""); + List cleanupPolicyValueList = CommonUtils.string2StrList(cleanupPolicyValue); + if (cleanupPolicyValueList.size() == 1 && cleanupPolicyValueList.contains(TopicConfig.CLEANUP_POLICY_DELETE)) { + // 不需要修改 + return Result.buildSuc(new Tuple<>(Boolean.FALSE, cleanupPolicyValue)); + } + + Map changedConfigMap = new HashMap<>(1); + changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE); + + Result rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator); + if (rv.failed()) { + // 修改失败 + return Result.buildFromIgnoreData(rv); + } + + return Result.buildSuc(new Tuple<>(Boolean.TRUE, cleanupPolicyValue)); + } + + private Result recoverConfigIfChanged(Long clusterPhyId, String topicName, Boolean changed, String originValue, String operator) { + if (!changed) { + // 没有修改,直接返回 + return Result.buildSuc(); + } + + // 恢复配置 + Map changedConfigMap = new HashMap<>(1); + changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, originValue); + + return topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator); + } private Seq buildBrokerMetadataSeq(Long clusterPhyId, final List selectedBrokerIdList) { // 选取Broker列表