mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
[Feature]新增Group及GroupOffset删除功能Part2 (#1084)
1、修复版本控制错误的问题; 2、增加相关权限点; PS:仅后端代码,前端待补充。
This commit is contained in:
@@ -44,6 +44,15 @@ INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_del
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming');
|
||||
|
||||
|
||||
-- 多集群管理权限2023-07-06新增
|
||||
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming');
|
||||
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');
|
||||
```
|
||||
|
||||
### 升级至 `3.3.0` 版本
|
||||
|
||||
@@ -151,7 +151,7 @@ public class GroupManagerImpl implements GroupManager {
|
||||
List<Group> groupList = groupService.listClusterGroups(clusterPhyId);
|
||||
|
||||
// 类型转化
|
||||
List<GroupOverviewVO> voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList());
|
||||
List<GroupOverviewVO> voList = groupList.stream().map(GroupConverter::convert2GroupOverviewVO).collect(Collectors.toList());
|
||||
|
||||
// 搜索groupName
|
||||
voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name"));
|
||||
@@ -301,7 +301,7 @@ public class GroupManagerImpl implements GroupManager {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空");
|
||||
}
|
||||
if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) {
|
||||
return opGroupService.deleteGroupOffset(
|
||||
return opGroupService.deleteGroupTopicOffset(
|
||||
new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()),
|
||||
operator
|
||||
);
|
||||
@@ -313,7 +313,7 @@ public class GroupManagerImpl implements GroupManager {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0");
|
||||
}
|
||||
if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) {
|
||||
return opGroupService.deleteGroupOffset(
|
||||
return opGroupService.deleteGroupTopicPartitionOffset(
|
||||
new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()),
|
||||
operator
|
||||
);
|
||||
|
||||
@@ -1,11 +1,15 @@
|
||||
package com.xiaojukeji.know.streaming.km.core.service.group;
|
||||
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
|
||||
public interface OpGroupService {
|
||||
/**
|
||||
* 删除Topic
|
||||
* 删除Offset
|
||||
*/
|
||||
Result<Void> deleteGroupOffset(DeleteGroupParam param, String operator);
|
||||
Result<Void> deleteGroupTopicOffset(DeleteGroupTopicParam param, String operator);
|
||||
Result<Void> deleteGroupTopicPartitionOffset(DeleteGroupTopicPartitionParam param, String operator);
|
||||
}
|
||||
|
||||
@@ -13,7 +13,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
@@ -46,6 +45,8 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
|
||||
private static final ILog LOGGER = LogFactory.getLog(OpGroupServiceImpl.class);
|
||||
|
||||
private static final String DELETE_GROUP_OFFSET = "deleteGroupOffset";
|
||||
private static final String DELETE_GROUP_TOPIC_OFFSET = "deleteGroupTopicOffset";
|
||||
private static final String DELETE_GROUP_TP_OFFSET = "deleteGroupTopicPartitionOffset";
|
||||
|
||||
@Autowired
|
||||
private GroupDAO groupDAO;
|
||||
@@ -66,7 +67,9 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
|
||||
|
||||
@PostConstruct
|
||||
private void init() {
|
||||
registerVCHandler(DELETE_GROUP_OFFSET, V_1_1_0, V_MAX, "deleteGroupOffset", this::deleteGroupOffsetByClient);
|
||||
registerVCHandler(DELETE_GROUP_OFFSET, V_2_0_0, V_MAX, "deleteGroupOffsetByClient", this::deleteGroupOffsetByClient);
|
||||
registerVCHandler(DELETE_GROUP_TOPIC_OFFSET, V_2_4_0, V_MAX, "deleteGroupTopicOffsetByClient", this::deleteGroupTopicOffsetByClient);
|
||||
registerVCHandler(DELETE_GROUP_TP_OFFSET, V_2_4_0, V_MAX, "deleteGroupTopicPartitionOffsetByClient", this::deleteGroupTopicPartitionOffsetByClient);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -80,47 +83,75 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
|
||||
return rv;
|
||||
}
|
||||
|
||||
// 清理数据库中的数据
|
||||
if (DeleteGroupTypeEnum.GROUP.equals(param.getDeleteGroupTypeEnum())) {
|
||||
// 记录操作
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_GROUP.getDesc(),
|
||||
String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()),
|
||||
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
|
||||
);
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
// 记录操作
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_GROUP.getDesc(),
|
||||
String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()),
|
||||
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
|
||||
);
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
|
||||
// 清理Group数据
|
||||
this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName());
|
||||
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName());
|
||||
} else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(param.getDeleteGroupTypeEnum())) {
|
||||
// 记录操作
|
||||
DeleteGroupTopicParam topicParam = (DeleteGroupTopicParam) param;
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_GROUP.getDesc(),
|
||||
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), topicParam.getTopicName()),
|
||||
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(topicParam))
|
||||
);
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
// 清理Group数据
|
||||
this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName());
|
||||
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName());
|
||||
|
||||
// 清理group + topic 数据
|
||||
this.deleteGroupMemberInDB(topicParam.getClusterPhyId(), topicParam.getGroupName(), topicParam.getTopicName());
|
||||
} else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(param.getDeleteGroupTypeEnum())) {
|
||||
// 记录操作
|
||||
DeleteGroupTopicPartitionParam partitionParam = (DeleteGroupTopicPartitionParam) param;
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_GROUP.getDesc(),
|
||||
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), partitionParam.getTopicName(), partitionParam.getPartitionId()),
|
||||
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(partitionParam))
|
||||
);
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
return rv;
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
// 不需要进行清理
|
||||
@Override
|
||||
public Result<Void> deleteGroupTopicOffset(DeleteGroupTopicParam param, String operator) {
|
||||
// 日志记录
|
||||
LOGGER.info("method=deleteGroupTopicOffset||param={}||operator={}||msg=delete group topic offset", ConvertUtil.obj2Json(param), operator);
|
||||
|
||||
try {
|
||||
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_TOPIC_OFFSET, param);
|
||||
if (rv == null || rv.failed()) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
// 清理数据库中的数据
|
||||
// 记录操作
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_GROUP.getDesc(),
|
||||
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), param.getTopicName()),
|
||||
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
|
||||
);
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
|
||||
// 清理group + topic 数据
|
||||
this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName(), param.getTopicName());
|
||||
|
||||
return rv;
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<Void> deleteGroupTopicPartitionOffset(DeleteGroupTopicPartitionParam param, String operator) {
|
||||
// 日志记录
|
||||
LOGGER.info("method=deleteGroupTopicPartitionOffset||param={}||operator={}||msg=delete group topic partition offset", ConvertUtil.obj2Json(param), operator);
|
||||
|
||||
try {
|
||||
Result<Void> rv = (Result<Void>) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_TP_OFFSET, param);
|
||||
if (rv == null || rv.failed()) {
|
||||
return rv;
|
||||
}
|
||||
|
||||
// 记录操作
|
||||
OplogDTO oplogDTO = new OplogDTO(operator,
|
||||
OperationEnum.DELETE.getDesc(),
|
||||
ModuleEnum.KAFKA_GROUP.getDesc(),
|
||||
String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), param.getPartitionId()),
|
||||
String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param))
|
||||
);
|
||||
opLogWrapService.saveOplogAndIgnoreException(oplogDTO);
|
||||
|
||||
return rv;
|
||||
} catch (VCHandlerNotExistException e) {
|
||||
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
|
||||
@@ -130,20 +161,6 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private Result<Void> deleteGroupOffsetByClient(VersionItemParam itemParam) {
|
||||
DeleteGroupParam deleteGroupParam = (DeleteGroupParam) itemParam;
|
||||
|
||||
if (DeleteGroupTypeEnum.GROUP.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
|
||||
return this.deleteGroupByClient(itemParam);
|
||||
} else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
|
||||
return this.deleteGroupTopicOffsetByClient(itemParam);
|
||||
} else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(deleteGroupParam.getDeleteGroupTypeEnum())) {
|
||||
return this.deleteGroupTopicPartitionOffsetByClient(itemParam);
|
||||
}
|
||||
|
||||
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "删除Offset时,删除的类型参数非法");
|
||||
}
|
||||
|
||||
private Result<Void> deleteGroupByClient(VersionItemParam itemParam) {
|
||||
DeleteGroupParam param = (DeleteGroupParam) itemParam;
|
||||
try {
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
|
||||
@@ -156,7 +173,7 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
|
||||
deleteConsumerGroupsResult.all().get();
|
||||
} catch (Exception e) {
|
||||
LOGGER.error(
|
||||
"method=deleteGroupByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!",
|
||||
"method=deleteGroupOffsetByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!",
|
||||
param.getClusterPhyId(), param.getGroupName(), e
|
||||
);
|
||||
|
||||
@@ -172,7 +189,7 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
|
||||
|
||||
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList(
|
||||
param.getTopicName()),
|
||||
param.getTopicName()),
|
||||
new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
|
||||
);
|
||||
|
||||
|
||||
@@ -39,8 +39,12 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
|
||||
private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic";
|
||||
|
||||
private static final String FE_DELETE_GROUP_OFFSET = "FEDeleteGroupOffset";
|
||||
private static final String FE_DELETE_GROUP_TOPIC_OFFSET = "FEDeleteGroupTopicOffset";
|
||||
private static final String FE_DELETE_GROUP_TOPIC_PARTITION_OFFSET = "FEDeleteGroupTopicPartitionOffset";
|
||||
|
||||
public FrontEndControlVersionItems(){}
|
||||
public FrontEndControlVersionItems() {
|
||||
// ignore
|
||||
}
|
||||
|
||||
@Override
|
||||
public int versionItemType() {
|
||||
@@ -97,9 +101,13 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric {
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX)
|
||||
.name(FE_TRUNCATE_TOPIC).desc("清空Topic"));
|
||||
|
||||
// truncate topic
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_1_1_0).maxVersion(VersionEnum.V_MAX)
|
||||
// 删除Offset
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_2_0_0).maxVersion(VersionEnum.V_MAX)
|
||||
.name(FE_DELETE_GROUP_OFFSET).desc("删除GroupOffset"));
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_2_4_0).maxVersion(VersionEnum.V_MAX)
|
||||
.name(FE_DELETE_GROUP_TOPIC_OFFSET).desc("删除GroupTopicOffset"));
|
||||
itemList.add(buildItem().minVersion(VersionEnum.V_2_4_0).maxVersion(VersionEnum.V_MAX)
|
||||
.name(FE_DELETE_GROUP_TOPIC_PARTITION_OFFSET).desc("删除GroupTopicPartitionOffset"));
|
||||
return itemList;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -147,3 +147,13 @@ INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_del
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2040', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming');
|
||||
|
||||
|
||||
-- 多集群管理权限2023-07-06新增
|
||||
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming');
|
||||
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming');
|
||||
INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');
|
||||
|
||||
Reference in New Issue
Block a user