mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Bugfix]修复Truncate数据不生效的问题
This commit is contained in:
@@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicCreateDTO;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicExpansionDTO;
|
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicExpansionDTO;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||||
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTopicConfigParam;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
|
||||||
@@ -17,17 +18,17 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
|||||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
|
import com.xiaojukeji.know.streaming.km.common.utils.*;
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
|
||||||
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil;
|
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService;
|
||||||
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
|
||||||
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
|
||||||
import kafka.admin.AdminUtils;
|
import kafka.admin.AdminUtils;
|
||||||
import kafka.admin.BrokerMetadata;
|
import kafka.admin.BrokerMetadata;
|
||||||
|
import org.apache.kafka.common.config.TopicConfig;
|
||||||
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
import org.springframework.stereotype.Component;
|
import org.springframework.stereotype.Component;
|
||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
@@ -61,6 +62,9 @@ public class OpTopicManagerImpl implements OpTopicManager {
|
|||||||
@Autowired
|
@Autowired
|
||||||
private PartitionService partitionService;
|
private PartitionService partitionService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TopicConfigService topicConfigService;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<Void> createTopic(TopicCreateDTO dto, String operator) {
|
public Result<Void> createTopic(TopicCreateDTO dto, String operator) {
|
||||||
log.info("method=createTopic||param={}||operator={}.", dto, operator);
|
log.info("method=createTopic||param={}||operator={}.", dto, operator);
|
||||||
@@ -160,10 +164,27 @@ public class OpTopicManagerImpl implements OpTopicManager {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
|
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
|
||||||
|
// 增加delete配置
|
||||||
|
Result<Tuple<Boolean, String>> rt = this.addDeleteConfigIfNotExist(clusterPhyId, topicName, operator);
|
||||||
|
if (rt.failed()) {
|
||||||
|
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||operator={}||result={}||msg=get config from kafka failed", clusterPhyId, topicName, operator, rt);
|
||||||
|
return Result.buildFromIgnoreData(rt);
|
||||||
|
}
|
||||||
|
|
||||||
// 清空Topic
|
// 清空Topic
|
||||||
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
|
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
|
||||||
if (rv.failed()) {
|
if (rv.failed()) {
|
||||||
return rv;
|
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv);
|
||||||
|
// config被修改了,则错误提示需要提醒一下,否则直接返回错误
|
||||||
|
return rt.getData().v1() ? Result.buildFailure(rv.getCode(), rv.getMessage() + "\t\n" + String.format("Topic的CleanupPolicy已被修改,需要手动恢复为%s", rt.getData().v2())) : rv;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 恢复compact配置
|
||||||
|
rv = this.recoverConfigIfChanged(clusterPhyId, topicName, rt.getData().v1(), rt.getData().v2(), operator);
|
||||||
|
if (rv.failed()) {
|
||||||
|
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic success but recover config failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv);
|
||||||
|
// config被修改了,则错误提示需要提醒一下,否则直接返回错误
|
||||||
|
return Result.buildFailure(rv.getCode(), String.format("Topic清空操作已成功,但是恢复CleanupPolicy配置失败,需要手动恢复为%s。", rt.getData().v2()) + "\t\n" + rv.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
return Result.buildSuc();
|
return Result.buildSuc();
|
||||||
@@ -171,6 +192,44 @@ public class OpTopicManagerImpl implements OpTopicManager {
|
|||||||
|
|
||||||
/**************************************************** private method ****************************************************/
|
/**************************************************** private method ****************************************************/
|
||||||
|
|
||||||
|
private Result<Tuple<Boolean, String>> addDeleteConfigIfNotExist(Long clusterPhyId, String topicName, String operator) {
|
||||||
|
// 获取Topic配置
|
||||||
|
Result<Map<String, String>> configMapResult = topicConfigService.getTopicConfigFromKafka(clusterPhyId, topicName);
|
||||||
|
if (configMapResult.failed()) {
|
||||||
|
return Result.buildFromIgnoreData(configMapResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
String cleanupPolicyValue = configMapResult.getData().getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "");
|
||||||
|
List<String> cleanupPolicyValueList = CommonUtils.string2StrList(cleanupPolicyValue);
|
||||||
|
if (cleanupPolicyValueList.size() == 1 && cleanupPolicyValueList.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
|
||||||
|
// 不需要修改
|
||||||
|
return Result.buildSuc(new Tuple<>(Boolean.FALSE, cleanupPolicyValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
Map<String, String> changedConfigMap = new HashMap<>(1);
|
||||||
|
changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
|
||||||
|
|
||||||
|
Result<Void> rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator);
|
||||||
|
if (rv.failed()) {
|
||||||
|
// 修改失败
|
||||||
|
return Result.buildFromIgnoreData(rv);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Result.buildSuc(new Tuple<>(Boolean.TRUE, cleanupPolicyValue));
|
||||||
|
}
|
||||||
|
|
||||||
|
private Result<Void> recoverConfigIfChanged(Long clusterPhyId, String topicName, Boolean changed, String originValue, String operator) {
|
||||||
|
if (!changed) {
|
||||||
|
// 没有修改,直接返回
|
||||||
|
return Result.buildSuc();
|
||||||
|
}
|
||||||
|
|
||||||
|
// 恢复配置
|
||||||
|
Map<String, String> changedConfigMap = new HashMap<>(1);
|
||||||
|
changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, originValue);
|
||||||
|
|
||||||
|
return topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator);
|
||||||
|
}
|
||||||
|
|
||||||
private Seq<BrokerMetadata> buildBrokerMetadataSeq(Long clusterPhyId, final List<Integer> selectedBrokerIdList) {
|
private Seq<BrokerMetadata> buildBrokerMetadataSeq(Long clusterPhyId, final List<Integer> selectedBrokerIdList) {
|
||||||
// 选取Broker列表
|
// 选取Broker列表
|
||||||
|
|||||||
Reference in New Issue
Block a user