mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
[Feature]增加Truncate数据功能(#1062)
增加Truncate数据功能(#1043) 目前已经完成后端部分,前端待补充。 --------- Co-authored-by: duanxiaoqiu <duanxiaoqiu@qiyi.com>
This commit is contained in:
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.topic;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic;
|
||||
|
||||
@@ -21,4 +22,9 @@ public interface OpTopicService {
|
||||
* 扩分区
|
||||
*/
|
||||
Result<Void> expandTopic(TopicPartitionExpandParam expandParam, String operator);
|
||||
|
||||
/**
|
||||
* 清空topic消息
|
||||
*/
|
||||
Result<Void> truncateTopic(TopicTruncateParam param, String operator);
|
||||
}
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemPara
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicCreateParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicPartitionExpandParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.topic.TopicTruncateParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
@@ -33,6 +34,7 @@ import kafka.zk.AdminZkClient;
|
||||
import kafka.zk.KafkaZkClient;
|
||||
import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.TopicPartition;
|
||||
import org.apache.kafka.common.TopicPartitionInfo;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.stereotype.Service;
|
||||
import scala.Option;
|
||||
@@ -57,6 +59,7 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement
|
||||
private static final String TOPIC_CREATE = "createTopic";
|
||||
private static final String TOPIC_DELETE = "deleteTopic";
|
||||
private static final String TOPIC_EXPAND = "expandTopic";
|
||||
private static final String TOPIC_TRUNCATE = "truncateTopic";
|
||||
|
||||
@Autowired
|
||||
private TopicService topicService;
|
||||
@@ -92,6 +95,8 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement
|
||||
|
||||
registerVCHandler(TOPIC_EXPAND, V_0_10_0_0, V_0_11_0_3, "expandTopicByZKClient", this::expandTopicByZKClient);
|
||||
registerVCHandler(TOPIC_EXPAND, V_0_11_0_3, V_MAX, "expandTopicByKafkaClient", this::expandTopicByKafkaClient);
|
||||
|
||||
registerVCHandler(TOPIC_TRUNCATE, V_0_11_0_0, V_MAX, "truncateTopicByKafkaClient", this::truncateTopicByKafkaClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -203,9 +208,58 @@ public class OpTopicServiceImpl extends BaseKafkaVersionControlService implement
|
||||
return rv;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> truncateTopic(TopicTruncateParam param, String operator) {
|
||||
try {
|
||||
// 清空topic数据
|
||||
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), TOPIC_TRUNCATE, param);
|
||||
|
||||
if (rv == null || rv.failed()) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
// 记录操作
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.TRUNCATE.getDesc(),
|
||||
ModuleEnum.KAFKA_TOPIC.getDesc(),
|
||||
MsgConstant.getTopicBizStr(param.getClusterPhyId(), param.getTopicName()),
|
||||
String.format("清空Topic:[%s]", param.toString()));
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
return rv;
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<Void> truncateTopicByKafkaClient(VersionItemParam itemParam) {
|
||||
TopicTruncateParam param = (TopicTruncateParam) itemParam;
|
||||
try {
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
|
||||
//获取topic的分区信息
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(param.getTopicName()), new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
|
||||
Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
|
||||
|
||||
Map<TopicPartition, RecordsToDelete> recordsToDelete = new HashMap<>();
|
||||
RecordsToDelete recordsToDeleteOffset = RecordsToDelete.beforeOffset(param.getOffset());
|
||||
|
||||
descriptionMap.forEach((topicName, topicDescription) -> {
|
||||
for (TopicPartitionInfo topicPartition : topicDescription.partitions()) {
|
||||
recordsToDelete.put(new TopicPartition(topicName, topicPartition.partition()), recordsToDeleteOffset);
|
||||
}
|
||||
});
|
||||
|
||||
DeleteRecordsResult deleteRecordsResult = adminClient.deleteRecords(recordsToDelete, new DeleteRecordsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
|
||||
deleteRecordsResult.all().get();
|
||||
} catch (Exception e) {
|
||||
log.error("truncate topic by kafka-client failed,clusterPhyId:{} topicName:{} offset:{}", param.getClusterPhyId(), param.getTopicName(), param.getOffset(), e);
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
|
||||
}
|
||||
|
||||
return Result.buildSuc();
|
||||
}
|
||||
|
||||
private Result<Void> deleteByKafkaClient(VersionItemParam itemParam) {
|
||||
TopicParam param = (TopicParam) itemParam;
|
||||
|
||||
@@ -36,6 +36,8 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
|
||||
private static final String FE_HA_CREATE_MIRROR_TOPIC = "FEHaCreateMirrorTopic";
|
||||
private static final String FE_HA_DELETE_MIRROR_TOPIC = "FEHaDeleteMirrorTopic";
|
||||
|
||||
private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic";
|
||||
|
||||
public FrontEndControlVersionItems(){}
|
||||
|
||||
@Override
|
||||
@@ -89,6 +91,10 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX)
|
||||
.name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制"));
|
||||
|
||||
//truncate topic
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX)
|
||||
.name(FE_TRUNCATE_TOPIC).desc("清空topic"));
|
||||
|
||||
return itemList;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user