diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java index 5a6d3ac6..a6b41eef 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/GroupManager.java @@ -1,6 +1,7 @@ package com.xiaojukeji.know.streaming.km.biz.group; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; @@ -39,5 +40,7 @@ public interface GroupManager { Result resetGroupOffsets(GroupOffsetResetDTO dto, String operator) throws Exception; + Result deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception; + List getGroupTopicOverviewVOList(Long clusterPhyId, List groupMemberPOList); } diff --git a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java index fd05c218..c80c4020 100644 --- a/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java +++ b/km-biz/src/main/java/com/xiaojukeji/know/streaming/km/biz/group/impl/GroupManagerImpl.java @@ -4,6 +4,7 @@ import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterGroupSummaryDTO; +import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationSortDTO; @@ -17,6 +18,9 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberConsume import com.xiaojukeji.know.streaming.km.common.bean.entity.kafka.KSMemberDescription; import com.xiaojukeji.know.streaming.km.common.bean.entity.metrics.GroupMetrics; import com.xiaojukeji.know.streaming.km.common.bean.entity.offset.KSOffsetSpec; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.topic.Topic; @@ -32,6 +36,7 @@ import com.xiaojukeji.know.streaming.km.common.converter.GroupConverter; import com.xiaojukeji.know.streaming.km.common.enums.AggTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.OffsetTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.SortTypeEnum; +import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.group.GroupStateEnum; import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException; import com.xiaojukeji.know.streaming.km.common.exception.NotExistException; @@ -42,6 +47,7 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupMetricService; import com.xiaojukeji.know.streaming.km.core.service.group.GroupService; +import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService; import com.xiaojukeji.know.streaming.km.core.service.partition.PartitionService; import com.xiaojukeji.know.streaming.km.core.service.topic.TopicService; import com.xiaojukeji.know.streaming.km.core.service.version.metrics.kafka.GroupMetricVersionItems; @@ -58,7 +64,7 @@ import static com.xiaojukeji.know.streaming.km.common.enums.group.GroupTypeEnum. @Component public class GroupManagerImpl implements GroupManager { - private static final ILog log = LogFactory.getLog(GroupManagerImpl.class); + private static final ILog LOGGER = LogFactory.getLog(GroupManagerImpl.class); @Autowired private TopicService topicService; @@ -66,6 +72,9 @@ public class GroupManagerImpl implements GroupManager { @Autowired private GroupService groupService; + @Autowired + private OpGroupService opGroupService; + @Autowired private PartitionService partitionService; @@ -246,6 +255,52 @@ public class GroupManagerImpl implements GroupManager { return groupService.resetGroupOffsets(dto.getClusterId(), dto.getGroupName(), offsetMapResult.getData(), operator); } + @Override + public Result deleteGroupOffsets(GroupOffsetDeleteDTO dto, String operator) throws Exception { + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(dto.getClusterPhyId()); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(dto.getClusterPhyId())); + } + + + // 按照group纬度进行删除 + if (ValidateUtils.isBlank(dto.getGroupName())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "groupName不允许为空"); + } + if (DeleteGroupTypeEnum.GROUP.getCode().equals(dto.getDeleteType())) { + return opGroupService.deleteGroupOffset( + new DeleteGroupParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP), + operator + ); + } + + + // 按照topic纬度进行删除 + if (ValidateUtils.isBlank(dto.getTopicName())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "topicName不允许为空"); + } + if (DeleteGroupTypeEnum.GROUP_TOPIC.getCode().equals(dto.getDeleteType())) { + return opGroupService.deleteGroupOffset( + new DeleteGroupTopicParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName()), + operator + ); + } + + + // 按照partition纬度进行删除 + if (ValidateUtils.isNullOrLessThanZero(dto.getPartitionId())) { + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "partitionId不允许为空或小于0"); + } + if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.getCode().equals(dto.getDeleteType())) { + return opGroupService.deleteGroupOffset( + new DeleteGroupTopicPartitionParam(dto.getClusterPhyId(), dto.getGroupName(), DeleteGroupTypeEnum.GROUP, dto.getTopicName(), dto.getPartitionId()), + operator + ); + } + + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "deleteType类型错误"); + } + @Override public List getGroupTopicOverviewVOList(Long clusterPhyId, List groupMemberPOList) { // 获取指标 @@ -257,7 +312,7 @@ public class GroupManagerImpl implements GroupManager { ); if (metricsListResult.failed()) { // 如果查询失败,则输出错误信息,但是依旧进行已有数据的返回 - log.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult); + LOGGER.error("method=completeMetricData||clusterPhyId={}||result={}||errMsg=search es failed", clusterPhyId, metricsListResult); } return this.convert2GroupTopicOverviewVOList(groupMemberPOList, metricsListResult.getData()); } diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetDeleteDTO.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetDeleteDTO.java new file mode 100644 index 00000000..03cb61c6 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/dto/group/GroupOffsetDeleteDTO.java @@ -0,0 +1,40 @@ +package com.xiaojukeji.know.streaming.km.common.bean.dto.group; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.xiaojukeji.know.streaming.km.common.bean.dto.BaseDTO; +import io.swagger.annotations.ApiModelProperty; +import lombok.Data; + +import javax.validation.constraints.Min; +import javax.validation.constraints.NotBlank; +import javax.validation.constraints.NotNull; + +/** + * 删除offset + * @author zengqiao + * @date 19/4/8 + */ +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class GroupOffsetDeleteDTO extends BaseDTO { + @Min(value = 0, message = "clusterPhyId不允许为null或者小于0") + @ApiModelProperty(value = "集群ID", example = "6") + private Long clusterPhyId; + + @NotBlank(message = "groupName不允许为空") + @ApiModelProperty(value = "消费组名称", example = "g-know-streaming") + private String groupName; + + @ApiModelProperty(value = "Topic名称,按照Topic纬度进行删除时需要传", example = "know-streaming") + protected String topicName; + + @ApiModelProperty(value = "分区ID,按照分区纬度进行删除时需要传") + private Integer partitionId; + + /** + * @see com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum + */ + @NotNull(message = "deleteType不允许为空") + @ApiModelProperty(value = "删除类型", example = "0:group纬度,1:Topic纬度,2:Partition纬度") + private Integer deleteType; +} \ No newline at end of file diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupParam.java new file mode 100644 index 00000000..3c9360a5 --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupParam.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group; + +import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DeleteGroupParam extends GroupParam { + protected DeleteGroupTypeEnum deleteGroupTypeEnum; + + public DeleteGroupParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum) { + super(clusterPhyId, groupName); + this.deleteGroupTypeEnum = deleteGroupTypeEnum; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupTopicParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupTopicParam.java new file mode 100644 index 00000000..c72fd97c --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupTopicParam.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group; + +import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DeleteGroupTopicParam extends DeleteGroupParam { + protected String topicName; + + public DeleteGroupTopicParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName) { + super(clusterPhyId, groupName, deleteGroupTypeEnum); + this.topicName = topicName; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupTopicPartitionParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupTopicPartitionParam.java new file mode 100644 index 00000000..e2f049cb --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/DeleteGroupTopicPartitionParam.java @@ -0,0 +1,16 @@ +package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group; + +import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +public class DeleteGroupTopicPartitionParam extends DeleteGroupTopicParam { + protected Integer partitionId; + + public DeleteGroupTopicPartitionParam(Long clusterPhyId, String groupName, DeleteGroupTypeEnum deleteGroupTypeEnum, String topicName, Integer partitionId) { + super(clusterPhyId, groupName, deleteGroupTypeEnum, topicName); + this.partitionId = partitionId; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/GroupParam.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/GroupParam.java index d7bf15f8..4f7552d9 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/GroupParam.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/param/group/GroupParam.java @@ -1,13 +1,11 @@ package com.xiaojukeji.know.streaming.km.common.bean.entity.param.group; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; -import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @NoArgsConstructor -@AllArgsConstructor public class GroupParam extends ClusterPhyParam { protected String groupName; diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/DeleteGroupTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/DeleteGroupTypeEnum.java new file mode 100644 index 00000000..ef99344c --- /dev/null +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/group/DeleteGroupTypeEnum.java @@ -0,0 +1,28 @@ +package com.xiaojukeji.know.streaming.km.common.enums.group; + +import lombok.Getter; + + +/** + * @author wyb + * @date 2022/10/11 + */ +@Getter +public enum DeleteGroupTypeEnum { + UNKNOWN(-1, "Unknown"), + + GROUP(0, "Group纬度"), + + GROUP_TOPIC(1, "GroupTopic纬度"), + + GROUP_TOPIC_PARTITION(2, "GroupTopicPartition纬度"); + + private final Integer code; + + private final String msg; + + DeleteGroupTypeEnum(Integer code, String msg) { + this.code = code; + this.msg = msg; + } +} diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java index 7bcf3234..d11c3bfa 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java +++ b/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/enums/version/VersionItemTypeEnum.java @@ -41,6 +41,8 @@ public enum VersionItemTypeEnum { SERVICE_OP_REASSIGNMENT(330, "service_reassign_operation"), + SERVICE_OP_GROUP(340, "service_group_operation"), + SERVICE_OP_CONNECT_CLUSTER(400, "service_connect_cluster_operation"), SERVICE_OP_CONNECT_CONNECTOR(401, "service_connect_connector_operation"), SERVICE_OP_CONNECT_PLUGIN(402, "service_connect_plugin_operation"), diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java new file mode 100644 index 00000000..83285a48 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/OpGroupService.java @@ -0,0 +1,11 @@ +package com.xiaojukeji.know.streaming.km.core.service.group; + +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; + +public interface OpGroupService { + /** + * 删除Topic + */ + Result deleteGroupOffset(DeleteGroupParam param, String operator); +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java new file mode 100644 index 00000000..b59519b9 --- /dev/null +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/group/impl/OpGroupServiceImpl.java @@ -0,0 +1,255 @@ +package com.xiaojukeji.know.streaming.km.core.service.group.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.didiglobal.logi.log.ILog; +import com.didiglobal.logi.log.LogFactory; +import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.param.group.DeleteGroupTopicPartitionParam; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; +import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupMemberPO; +import com.xiaojukeji.know.streaming.km.common.bean.po.group.GroupPO; +import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.group.DeleteGroupTypeEnum; +import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; +import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; +import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; +import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; +import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; +import com.xiaojukeji.know.streaming.km.core.service.group.OpGroupService; +import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; +import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService; +import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; +import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupDAO; +import com.xiaojukeji.know.streaming.km.persistence.mysql.group.GroupMemberDAO; +import org.apache.kafka.clients.admin.*; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import javax.annotation.PostConstruct; +import java.util.*; +import java.util.stream.Collectors; + +import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; +import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_GROUP; + +/** + * @author didi + */ +@Service +public class OpGroupServiceImpl extends BaseKafkaVersionControlService implements OpGroupService { + private static final ILog LOGGER = LogFactory.getLog(OpGroupServiceImpl.class); + + private static final String DELETE_GROUP_OFFSET = "deleteGroupOffset"; + + @Autowired + private GroupDAO groupDAO; + + @Autowired + private GroupMemberDAO groupMemberDAO; + + @Autowired + private OpLogWrapService opLogWrapService; + + @Autowired + private KafkaAdminClient kafkaAdminClient; + + @Override + protected VersionItemTypeEnum getVersionItemType() { + return SERVICE_OP_GROUP; + } + + @PostConstruct + private void init() { + registerVCHandler(DELETE_GROUP_OFFSET, V_1_1_0, V_MAX, "deleteGroupOffset", this::deleteGroupOffsetByClient); + } + + @Override + public Result deleteGroupOffset(DeleteGroupParam param, String operator) { + // 日志记录 + LOGGER.info("method=deleteGroupOffset||param={}||operator={}||msg=delete group offset", ConvertUtil.obj2Json(param), operator); + + try { + Result rv = (Result) doVCHandler(param.getClusterPhyId(), DELETE_GROUP_OFFSET, param); + if (rv == null || rv.failed()) { + return rv; + } + + // 清理数据库中的数据 + if (DeleteGroupTypeEnum.GROUP.equals(param.getDeleteGroupTypeEnum())) { + // 记录操作 + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_GROUP.getDesc(), + String.format("集群ID:[%d] Group名称:[%s]", param.getClusterPhyId(), param.getGroupName()), + String.format("删除Offset:[%s]", ConvertUtil.obj2Json(param)) + ); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + + // 清理Group数据 + this.deleteGroupInDB(param.getClusterPhyId(), param.getGroupName()); + this.deleteGroupMemberInDB(param.getClusterPhyId(), param.getGroupName()); + } else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(param.getDeleteGroupTypeEnum())) { + // 记录操作 + DeleteGroupTopicParam topicParam = (DeleteGroupTopicParam) param; + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_GROUP.getDesc(), + String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s]", param.getClusterPhyId(), param.getGroupName(), topicParam.getTopicName()), + String.format("删除Offset:[%s]", ConvertUtil.obj2Json(topicParam)) + ); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + + // 清理group + topic 数据 + this.deleteGroupMemberInDB(topicParam.getClusterPhyId(), topicParam.getGroupName(), topicParam.getTopicName()); + } else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(param.getDeleteGroupTypeEnum())) { + // 记录操作 + DeleteGroupTopicPartitionParam partitionParam = (DeleteGroupTopicPartitionParam) param; + OplogDTO oplogDTO = new OplogDTO(operator, + OperationEnum.DELETE.getDesc(), + ModuleEnum.KAFKA_GROUP.getDesc(), + String.format("集群ID:[%d] Group名称:[%s] Topic名称:[%s] PartitionID:[%d]", param.getClusterPhyId(), param.getGroupName(), partitionParam.getTopicName(), partitionParam.getPartitionId()), + String.format("删除Offset:[%s]", ConvertUtil.obj2Json(partitionParam)) + ); + opLogWrapService.saveOplogAndIgnoreException(oplogDTO); + + // 不需要进行清理 + } + + return rv; + } catch (VCHandlerNotExistException e) { + return Result.buildFailure(VC_HANDLE_NOT_EXIST); + } + } + + /**************************************************** private method ****************************************************/ + + private Result deleteGroupOffsetByClient(VersionItemParam itemParam) { + DeleteGroupParam deleteGroupParam = (DeleteGroupParam) itemParam; + + if (DeleteGroupTypeEnum.GROUP.equals(deleteGroupParam.getDeleteGroupTypeEnum())) { + return this.deleteGroupByClient(itemParam); + } else if (DeleteGroupTypeEnum.GROUP_TOPIC.equals(deleteGroupParam.getDeleteGroupTypeEnum())) { + return this.deleteGroupTopicOffsetByClient(itemParam); + } else if (DeleteGroupTypeEnum.GROUP_TOPIC_PARTITION.equals(deleteGroupParam.getDeleteGroupTypeEnum())) { + return this.deleteGroupTopicPartitionOffsetByClient(itemParam); + } + + return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, "删除Offset时,删除的类型参数非法"); + } + + private Result deleteGroupByClient(VersionItemParam itemParam) { + DeleteGroupParam param = (DeleteGroupParam) itemParam; + try { + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); + + DeleteConsumerGroupsResult deleteConsumerGroupsResult = adminClient.deleteConsumerGroups( + Collections.singletonList(param.getGroupName()), + new DeleteConsumerGroupsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + deleteConsumerGroupsResult.all().get(); + } catch (Exception e) { + LOGGER.error( + "method=deleteGroupByClient||clusterPhyId={}||groupName={}||errMsg=delete group failed||msg=exception!", + param.getClusterPhyId(), param.getGroupName(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + } + + return Result.buildSuc(); + } + + private Result deleteGroupTopicOffsetByClient(VersionItemParam itemParam) { + DeleteGroupTopicParam param = (DeleteGroupTopicParam) itemParam; + try { + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); + + DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Collections.singletonList( + param.getTopicName()), + new DescribeTopicsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + List tpList = describeTopicsResult + .all() + .get() + .get(param.getTopicName()) + .partitions() + .stream() + .map(elem -> new TopicPartition(param.getTopicName(), elem.partition())) + .collect(Collectors.toList()); + + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = adminClient.deleteConsumerGroupOffsets( + param.getGroupName(), + new HashSet<>(tpList), + new DeleteConsumerGroupOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + deleteConsumerGroupOffsetsResult.all().get(); + } catch (Exception e) { + LOGGER.error( + "method=deleteGroupTopicOffsetByClient||clusterPhyId={}||groupName={}||topicName={}||errMsg=delete group failed||msg=exception!", + param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + } + + return Result.buildSuc(); + } + + private Result deleteGroupTopicPartitionOffsetByClient(VersionItemParam itemParam) { + DeleteGroupTopicPartitionParam param = (DeleteGroupTopicPartitionParam) itemParam; + try { + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); + + DeleteConsumerGroupOffsetsResult deleteConsumerGroupOffsetsResult = adminClient.deleteConsumerGroupOffsets( + param.getGroupName(), + new HashSet<>(Arrays.asList(new TopicPartition(param.getTopicName(), param.getPartitionId()))), + new DeleteConsumerGroupOffsetsOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS) + ); + + deleteConsumerGroupOffsetsResult.all().get(); + } catch (Exception e) { + LOGGER.error( + "method=deleteGroupTopicPartitionOffsetByClient||clusterPhyId={}||groupName={}||topicName={}||partitionId={}||errMsg=delete group failed||msg=exception!", + param.getClusterPhyId(), param.getGroupName(), param.getTopicName(), param.getPartitionId(), e + ); + + return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage()); + } + + return Result.buildSuc(); + } + + private int deleteGroupInDB(Long clusterPhyId, String groupName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupPO::getClusterPhyId, clusterPhyId); + lambdaQueryWrapper.eq(GroupPO::getName, groupName); + + return groupDAO.delete(lambdaQueryWrapper); + } + + private int deleteGroupMemberInDB(Long clusterPhyId, String groupName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); + lambdaQueryWrapper.eq(GroupMemberPO::getGroupName, groupName); + + return groupMemberDAO.delete(lambdaQueryWrapper); + } + + private int deleteGroupMemberInDB(Long clusterPhyId, String groupName, String topicName) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(GroupMemberPO::getClusterPhyId, clusterPhyId); + lambdaQueryWrapper.eq(GroupMemberPO::getGroupName, groupName); + lambdaQueryWrapper.eq(GroupMemberPO::getTopicName, topicName); + + return groupMemberDAO.delete(lambdaQueryWrapper); + } +} diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java index db8baa4a..a56b9b0e 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/version/fe/FrontEndControlVersionItems.java @@ -38,6 +38,8 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { private static final String FE_TRUNCATE_TOPIC = "FETruncateTopic"; + private static final String FE_DELETE_GROUP_OFFSET = "FEDeleteGroupOffset"; + public FrontEndControlVersionItems(){} @Override @@ -91,10 +93,13 @@ public class FrontEndControlVersionItems extends BaseMetricVersionMetric { itemList.add(buildItem().minVersion(VersionEnum.V_2_5_0_D_300).maxVersion(VersionEnum.V_2_5_0_D_MAX) .name(FE_HA_DELETE_MIRROR_TOPIC).desc("HA-取消Topic复制")); - //truncate topic + // truncate topic itemList.add(buildItem().minVersion(VersionEnum.V_0_11_0_0).maxVersion(VersionEnum.V_MAX) - .name(FE_TRUNCATE_TOPIC).desc("清空topic")); + .name(FE_TRUNCATE_TOPIC).desc("清空Topic")); + // truncate topic + itemList.add(buildItem().minVersion(VersionEnum.V_1_1_0).maxVersion(VersionEnum.V_MAX) + .name(FE_DELETE_GROUP_OFFSET).desc("删除GroupOffset")); return itemList; } } diff --git a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java index 55e7e778..9233be88 100644 --- a/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java +++ b/km-rest/src/main/java/com/xiaojukeji/know/streaming/km/rest/api/v3/group/GroupController.java @@ -2,6 +2,7 @@ package com.xiaojukeji.know.streaming.km.rest.api.v3.group; import com.didiglobal.logi.security.util.HttpRequestUtil; import com.xiaojukeji.know.streaming.km.biz.group.GroupManager; +import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetDeleteDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupOffsetResetDTO; import com.xiaojukeji.know.streaming.km.common.bean.dto.group.GroupTopicConsumedDTO; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.PaginationResult; @@ -32,13 +33,20 @@ public class GroupController { @Autowired private GroupService groupService; - @ApiOperation(value = "重置组消费偏移", notes = "") + @ApiOperation(value = "重置消费偏移", notes = "") @PutMapping(value = "group-offsets") @ResponseBody public Result resetGroupOffsets(@Validated @RequestBody GroupOffsetResetDTO dto) throws Exception { return groupManager.resetGroupOffsets(dto, HttpRequestUtil.getOperator()); } + @ApiOperation(value = "删除消费偏移", notes = "") + @DeleteMapping(value = "group-offsets") + @ResponseBody + public Result deleteGroupOffsets(@Validated @RequestBody GroupOffsetDeleteDTO dto) throws Exception { + return groupManager.deleteGroupOffsets(dto, HttpRequestUtil.getOperator()); + } + @ApiOperation(value = "Group-Topic指标信息", notes = "") @PostMapping(value = "clusters/{clusterId}/topics/{topicName}/groups/{groupName}/metric") @ResponseBody