From dd3dcd37e9e294ea341dfa131c48d8a2490609ac Mon Sep 17 00:00:00 2001 From: EricZeng Date: Thu, 6 Jul 2023 15:42:18 +0800 Subject: [PATCH] =?UTF-8?q?[Feature]=E6=96=B0=E5=A2=9EGroup=E5=8F=8AGroupO?= =?UTF-8?q?ffset=E5=88=A0=E9=99=A4=E5=8A=9F=E8=83=BDPart2=20(#1084)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、修复版本控制错误的问题; 2、增加相关权限点; PS:仅后端代码,前端待补充。 --- docs/install_guide/版本升级手册.md | 9 ++ .../km/biz/group/impl/GroupManagerImpl.java | 6 +- .../km/core/service/group/OpGroupService.java | 6 +- .../group/impl/OpGroupServiceImpl.java | 125 ++++++++++-------- .../fe/FrontEndControlVersionItems.java | 14 +- .../src/main/resources/sql/dml-logi.sql | 10 ++ 6 files changed, 109 insertions(+), 61 deletions(-) diff --git a/docs/install_guide/版本升级手册.md b/docs/install_guide/版本升级手册.md index ba8fa8d8..675a9dab 100644 --- a/docs/install_guide/版本升级手册.md +++ b/docs/install_guide/版本升级手册.md @@ -44,6 +44,15 @@ INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_del INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming'); INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming'); + +-- 多集群管理权限2023-07-06新增 +INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming'); +INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming'); +INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming'); + +INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming'); +INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming'); +INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming'); ``` ### 升级至 `3.3.0` 版本 diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index 1f37c368..753768df 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -151,7 +151,7 @@ public class GroupManagerImpl implements GroupManager { List groupList = groupService.listClusterGroups(clusterPhyId); // 类型转化 - List voList = groupList.stream().map(elem -> GroupConverter.convert2GroupOverviewVO(elem)).collect(Collectors.toList()); + List voList = groupList.stream().map(GroupConverter::convert2GroupOverviewVO).collect(Collectors.toList()); // 搜索groupName voList = PaginationUtil.pageByFuzzyFilter(voList, dto.getSearchGroupName(), Arrays.asList("name")); @@ -301,7 +301,7 @@ public class GroupManagerImpl implements GroupManager { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空"); } if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) { - return opGroupService.deleteGroupOffset( + return opGroupService.deleteGroupTopicOffset( new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()), operator ); @@ -313,7 +313,7 @@ public class GroupManagerImpl implements GroupManager { return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0"); } if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) { - return opGroupService.deleteGroupOffset( + return opGroupService.deleteGroupTopicPartitionOffset( new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()), operator ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java index 83285a48..dbd6bae5 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java @@ -1,11 +1,15 @@ package com.xiaojukeji.know.streaming.km.core.service.group; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; public interface OpGroupService { /** - * 删除Topic + * 删除Offset */ Result deleteGroupOffset(DeleteGroupParam param, String operator); + Result deleteGroupTopicOffset(DeleteGroupTopicParam param, String operator); + Result deleteGroupTopicPartitionOffset(DeleteGroupTopicPartitionParam param, String operator); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java index b59519b9..e82c36a9 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java @@ -13,7 +13,6 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; -import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; @@ -46,6 +45,8 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement private static final ILog LOGGER = LogFactory.getLog(OpGroupServiceImpl.class); private static final String DELETE_GROUP_OFFSET = "deleteGroupOffset"; + private static final String DELETE_GROUP_TOPIC_OFFSET = "deleteGroupTopicOffset"; + private static final String DELETE_GROUP_TP_OFFSET = "deleteGroupTopicPartitionOffset"; @Autowired private GroupDAO groupDAO; @@ -66,7 +67,9 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement @PostConstruct private void init() { - registerVCHandler(DELETE_GROUP_OFFSET, V_1_1_0, V_MAX, "deleteGroupOffset", this::deleteGroupOffsetByClient); + registerVCHandler(DELETE_GROUP_OFFSET, V_2_0_0, V_MAX, "deleteGroupOffsetByClient", this::deleteGroupOffsetByClient); + registerVCHandler(DELETE_GROUP_TOPIC_OFFSET, V_2_4_0, V_MAX, "deleteGroupTopicOffsetByClient", this::deleteGroupTopicOffsetByClient); + registerVCHandler(DELETE_GROUP_TP_OFFSET, V_2_4_0, V_MAX, "deleteGroupTopicPartitionOffsetByClient", this::deleteGroupTopicPartitionOffsetByClient); } @Override @@ -80,47 +83,75 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement return rv; } - // 清理数据库中的数据 - if (DeleteGroupTypeEnum.GROUP.equals(param.getDeleteGroupTypeEnum())) { - // 记录操作 - OplogDTO oplogDTO = new OplogDTO(operator, - OperationEnum.DELETE.getDesc(), - ModuleEnum.KAFKA_GROUP.getDesc(), - String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()), - String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param)) - ); - opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + // 记录操作 + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_GROUP.getDesc(), + String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()), + String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param)) + ); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); - // 清理Group数据 - this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName()); - this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName()); - } else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(param.getDeleteGroupTypeEnum())) { - // 记录操作 - DeleteGroupTopicParam topicParam = (DeleteGroupTopicParam) param; - OplogDTO oplogDTO = new OplogDTO(operator, - OperationEnum.DELETE.getDesc(), - ModuleEnum.KAFKA_GROUP.getDesc(), - String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), topicParam.getTopicName()), - String.format("删除Offset:[%s]", ConvertUtil.obj2Json(topicParam)) - ); - opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + // 清理Group数据 + this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName()); + this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName()); - // 清理group + topic 数据 - this.deleteGroupMemberInDB(topicParam.getClusterPhyId(), topicParam.getGroupName(), topicParam.getTopicName()); - } else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(param.getDeleteGroupTypeEnum())) { - // 记录操作 - DeleteGroupTopicPartitionParam partitionParam = (DeleteGroupTopicPartitionParam) param; - OplogDTO oplogDTO = new OplogDTO(operator, - OperationEnum.DELETE.getDesc(), - ModuleEnum.KAFKA_GROUP.getDesc(), - String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), partitionParam.getTopicName(), partitionParam.getPartitionId()), - String.format("删除Offset:[%s]", ConvertUtil.obj2Json(partitionParam)) - ); - opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + return rv; + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } - // 不需要进行清理 + @Override + public Result deleteGroupTopicOffset(DeleteGroupTopicParam param, String operator) { + // 日志记录 + LOGGER.info("method=deleteGroupTopicOffset||param={}||operator={}||msg=delete group topic offset", ConvertUtil.obj2Json(param), operator); + + try { + Result rv = (Result) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_TOPIC_OFFSET, param); + if (rv == null || rv.failed()) { + return rv; } + // 清理数据库中的数据 + // 记录操作 + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_GROUP.getDesc(), + String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), param.getTopicName()), + String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param)) + ); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + + // 清理group + topic 数据 + this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName(), param.getTopicName()); + + return rv; + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + @Override + public Result deleteGroupTopicPartitionOffset(DeleteGroupTopicPartitionParam param, String operator) { + // 日志记录 + LOGGER.info("method=deleteGroupTopicPartitionOffset||param={}||operator={}||msg=delete group topic partition offset", ConvertUtil.obj2Json(param), operator); + + try { + Result rv = (Result) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_TP_OFFSET, param); + if (rv == null || rv.failed()) { + return rv; + } + + // 记录操作 + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_GROUP.getDesc(), + String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), param.getPartitionId()), + String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param)) + ); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + return rv; } catch (VCHandlerNotExistException e) { return Result.buildFailure(VC_HANDLE_NOT_EXIST); @@ -130,20 +161,6 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement /**************************************************** private method ****************************************************/ private Result deleteGroupOffsetByClient(VersionItemParam itemParam) { - DeleteGroupParam deleteGroupParam = (DeleteGroupParam) itemParam; - - if (DeleteGroupTypeEnum.GROUP.equals(deleteGroupParam.getDeleteGroupTypeEnum())) { - return this.deleteGroupByClient(itemParam); - } else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(deleteGroupParam.getDeleteGroupTypeEnum())) { - return this.deleteGroupTopicOffsetByClient(itemParam); - } else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(deleteGroupParam.getDeleteGroupTypeEnum())) { - return this.deleteGroupTopicPartitionOffsetByClient(itemParam); - } - - return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "删除Offset时,删除的类型参数非法"); - } - - private Result deleteGroupByClient(VersionItemParam itemParam) { DeleteGroupParam param = (DeleteGroupParam) itemParam; try { AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); @@ -156,7 +173,7 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement deleteConsumerGroupsResult.all().get(); } catch (Exception e) { LOGGER.error( - "method=deleteGroupByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!", + "method=deleteGroupOffsetByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!", param.getClusterPhyId(), param.getGroupName(), e ); @@ -172,7 +189,7 @@ public class OpGroupServiceImpl extends BaseKafkaVersionControlService implement AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList( - param.getTopicName()), + param.getTopicName()), new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) ); diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java index a56b9b0e..18cdb44e 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java @@ -39,8 +39,12 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic"; private static final String FE_DELETE_GROUP_OFFSET = "FEDeleteGroupOffset"; + private static final String FE_DELETE_GROUP_TOPIC_OFFSET = "FEDeleteGroupTopicOffset"; + private static final String FE_DELETE_GROUP_TOPIC_PARTITION_OFFSET = "FEDeleteGroupTopicPartitionOffset"; - public FrontEndControlVersionItems(){} + public FrontEndControlVersionItems() { + // ignore + } @Override public int versionItemType() { @@ -97,9 +101,13 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX) .name(FE_TRUNCATE_TOPIC).desc("清空Topic")); - // truncate topic - itemList.add(buildItem().minVersion(VersionEnum.V_1_1_0).maxVersion(VersionEnum.V_MAX) + // 删除Offset + itemList.add(buildItem().minVersion(VersionEnum.V_2_0_0).maxVersion(VersionEnum.V_MAX) .name(FE_DELETE_GROUP_OFFSET).desc("删除GroupOffset")); + itemList.add(buildItem().minVersion(VersionEnum.V_2_4_0).maxVersion(VersionEnum.V_MAX) + .name(FE_DELETE_GROUP_TOPIC_OFFSET).desc("删除GroupTopicOffset")); + itemList.add(buildItem().minVersion(VersionEnum.V_2_4_0).maxVersion(VersionEnum.V_MAX) + .name(FE_DELETE_GROUP_TOPIC_PARTITION_OFFSET).desc("删除GroupTopicPartitionOffset")); return itemList; } } diff --git a/km-persistence/src/main/resources/sql/dml-logi.sql b/km-persistence/src/main/resources/sql/dml-logi.sql index b19c9410..2beff22e 100644 --- a/km-persistence/src/main/resources/sql/dml-logi.sql +++ b/km-persistence/src/main/resources/sql/dml-logi.sql @@ -147,3 +147,13 @@ INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_del INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2040', '0', 'know-streaming'); INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2042', '0', 'know-streaming'); INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2044', '0', 'know-streaming'); + + +-- 多集群管理权限2023-07-06新增 +INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2046', 'Group-删除', '1593', '1', '2', 'Group-删除', '0', 'know-streaming'); +INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2048', 'GroupOffset-Topic纬度删除', '1593', '1', '2', 'GroupOffset-Topic纬度删除', '0', 'know-streaming'); +INSERT INTO `logi_security_permission` (`id`, `permission_name`, `parent_id`, `leaf`, `level`, `description`, `is_delete`, `app_name`) VALUES ('2050', 'GroupOffset-Partition纬度删除', '1593', '1', '2', 'GroupOffset-Partition纬度删除', '0', 'know-streaming'); + +INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2046', '0', 'know-streaming'); +INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2048', '0', 'know-streaming'); +INSERT INTO `logi_security_role_permission` (`role_id`, `permission_id`, `is_delete`, `app_name`) VALUES ('1677', '2050', '0', 'know-streaming');