Merge branch 'fix_1043' into ve_3.x_dev

This commit is contained in:
qiao.zeng
2023-11-12 15:31:18 +08:00

View File

@@ -7,6 +7,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicCreateDTO;
import com.xiaojukeji.know.streaming.km.common.bean.dto.topic.TopicExpansionDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.broker.Broker;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.config.KafkaTopicConfigParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
@@ -17,17 +18,17 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.utils.BackoffUtils;
import com.xiaojukeji.know.streaming.km.common.utils.FutureUtil;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.common.utils.*;
import com.xiaojukeji.know.streaming.km.common.utils.kafka.KafkaReplicaAssignUtil;
import com.xiaojukeji.know.streaming.km.core.service.broker.BrokerService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService;
import com.xiaojukeji.know.streaming.km.core.service.topic.OpTopicService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicConfigService;
import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService;
import kafka.admin.AdminUtils;
import kafka.admin.BrokerMetadata;
import org.apache.kafka.common.config.TopicConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@@ -61,6 +62,9 @@ public class OpTopicManagerImpl implements OpTopicManager {
@Autowired
private PartitionService partitionService;
@Autowired
private TopicConfigService topicConfigService;
@Override
public Result<Void> createTopic(TopicCreateDTO dto, String operator) {
log.info("method=createTopic||param={}||operator={}.", dto, operator);
@@ -160,10 +164,27 @@ public class OpTopicManagerImpl implements OpTopicManager {
@Override
public Result<Void> truncateTopic(Long clusterPhyId, String topicName, String operator) {
// 增加delete配置
Result<Tuple<Boolean, String>> rt = this.addDeleteConfigIfNotExist(clusterPhyId, topicName, operator);
if (rt.failed()) {
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||operator={}||result={}||msg=get config from kafka failed", clusterPhyId, topicName, operator, rt);
return Result.buildFromIgnoreData(rt);
}
// 清空Topic
Result<Void> rv = opTopicService.truncateTopic(new TopicTruncateParam(clusterPhyId, topicName, KafkaConstant.TOPICK_TRUNCATE_DEFAULT_OFFSET), operator);
if (rv.failed()) {
return rv;
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv);
// config被修改了则错误提示需要提醒一下否则直接返回错误
return rt.getData().v1() ? Result.buildFailure(rv.getCode(), rv.getMessage() + "\t\n" + String.format("Topic的CleanupPolicy已被修改需要手动恢复为%s", rt.getData().v2())) : rv;
}
// 恢复compact配置
rv = this.recoverConfigIfChanged(clusterPhyId, topicName, rt.getData().v1(), rt.getData().v2(), operator);
if (rv.failed()) {
log.error("method=truncateTopic||clusterPhyId={}||topicName={}||originConfig={}||operator={}||result={}||msg=truncate topic success but recover config failed", clusterPhyId, topicName, rt.getData().v2(), operator, rv);
// config被修改了则错误提示需要提醒一下否则直接返回错误
return Result.buildFailure(rv.getCode(), String.format("Topic清空操作已成功但是恢复CleanupPolicy配置失败需要手动恢复为%s。", rt.getData().v2()) + "\t\n" + rv.getMessage());
}
return Result.buildSuc();
@@ -171,6 +192,44 @@ public class OpTopicManagerImpl implements OpTopicManager {
/**************************************************** private method ****************************************************/
private Result<Tuple<Boolean, String>> addDeleteConfigIfNotExist(Long clusterPhyId, String topicName, String operator) {
// 获取Topic配置
Result<Map<String, String>> configMapResult = topicConfigService.getTopicConfigFromKafka(clusterPhyId, topicName);
if (configMapResult.failed()) {
return Result.buildFromIgnoreData(configMapResult);
}
String cleanupPolicyValue = configMapResult.getData().getOrDefault(TopicConfig.CLEANUP_POLICY_CONFIG, "");
List<String> cleanupPolicyValueList = CommonUtils.string2StrList(cleanupPolicyValue);
if (cleanupPolicyValueList.size() == 1 && cleanupPolicyValueList.contains(TopicConfig.CLEANUP_POLICY_DELETE)) {
// 不需要修改
return Result.buildSuc(new Tuple<>(Boolean.FALSE, cleanupPolicyValue));
}
Map<String, String> changedConfigMap = new HashMap<>(1);
changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_DELETE);
Result<Void> rv = topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator);
if (rv.failed()) {
// 修改失败
return Result.buildFromIgnoreData(rv);
}
return Result.buildSuc(new Tuple<>(Boolean.TRUE, cleanupPolicyValue));
}
private Result<Void> recoverConfigIfChanged(Long clusterPhyId, String topicName, Boolean changed, String originValue, String operator) {
if (!changed) {
// 没有修改,直接返回
return Result.buildSuc();
}
// 恢复配置
Map<String, String> changedConfigMap = new HashMap<>(1);
changedConfigMap.put(TopicConfig.CLEANUP_POLICY_CONFIG, originValue);
return topicConfigService.modifyTopicConfig(new KafkaTopicConfigParam(clusterPhyId, topicName, changedConfigMap), operator);
}
private Seq<BrokerMetadata> buildBrokerMetadataSeq(Long clusterPhyId, final List<Integer> selectedBrokerIdList) {
// 选取Broker列表