From 78f625dc8c12bbe7d9704e9419460b2d8e4eb45d Mon Sep 17 00:00:00 2001 From: zengqiao Date: Mon, 29 Aug 2022 16:45:48 +0800 Subject: [PATCH] =?UTF-8?q?=E6=97=A5=E5=BF=97=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/acl/impl/KafkaAclServiceImpl.java | 18 +++++++++++ .../impl/KafkaChangeRecordServiceImpl.java | 30 +++++++++++++++++-- .../cluster/impl/ClusterPhyServiceImpl.java | 2 +- .../kafkauser/impl/KafkaUserServiceImpl.java | 19 +++++++++++- 4 files changed, 65 insertions(+), 4 deletions(-) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java index 5c52af61..836c7d56 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.acl.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; @@ -10,10 +11,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; @@ -58,6 +61,9 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka @Autowired private KafkaAdminZKClient kafkaAdminZKClient; + @Autowired + private ClusterPhyService clusterPhyService; + @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.SERVICE_OP_ACL; @@ -175,6 +181,18 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka private Result> getAclByKafkaClient(VersionItemParam itemParam) { ClusterPhyParam param = (ClusterPhyParam) itemParam; try { + // 获取集群 + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId()); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId())); + } + + // 判断是否开启认证 + if (!ClusterAuthTypeEnum.enableAuth(clusterPhy.getAuthType())) { + log.warn("method=getAclByKafkaClient||clusterPhyId={}||msg=not open auth and ignore get acls", clusterPhy.getId()); + return Result.buildSuc(new ArrayList<>()); + } + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); DescribeAclsResult describeAclsResult = diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java index 0c4eefe7..40265a8f 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/change/record/impl/KafkaChangeRecordServiceImpl.java @@ -5,14 +5,19 @@ import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO; import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService; import com.xiaojukeji.know.streaming.km.persistence.mysql.changerecord.KafkaChangeRecordDAO; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; +import java.util.List; +import java.util.concurrent.TimeUnit; + + @Service public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService { private static final ILog log = LogFactory.getLog(KafkaChangeRecordServiceImpl.class); @@ -20,11 +25,24 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService { @Autowired private KafkaChangeRecordDAO kafkaChangeRecordDAO; + private static final Cache recordCache = Caffeine.newBuilder() + .expireAfterWrite(12, TimeUnit.HOURS) + .maximumSize(1000) + .build(); + @Override public int insertAndIgnoreDuplicate(KafkaChangeRecordPO recordPO) { try { + String cacheData = recordCache.getIfPresent(recordPO.getUniqueField()); + if (cacheData != null || this.checkExistInDB(recordPO.getUniqueField())) { + // 已存在时,则直接返回 + return 0; + } + + recordCache.put(recordPO.getUniqueField(), recordPO.getUniqueField()); + return kafkaChangeRecordDAO.insert(recordPO); - } catch (DuplicateKeyException dke) { + } catch (Exception e) { return 0; } } @@ -40,4 +58,12 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService { /**************************************************** private method ****************************************************/ + private boolean checkExistInDB(String uniqueField) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(KafkaChangeRecordPO::getUniqueField, uniqueField); + + List poList = kafkaChangeRecordDAO.selectList(lambdaQueryWrapper); + + return poList != null && !poList.isEmpty(); + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java index 562645c0..d7c355ef 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/cluster/impl/ClusterPhyServiceImpl.java @@ -111,7 +111,7 @@ public class ClusterPhyServiceImpl implements ClusterPhyService { throw new DuplicateException(String.format("clusterName:%s duplicated", clusterPhyPO.getName())); } catch (Exception e) { - log.error("cmethod=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e); + log.error("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e); throw new AdminOperateException("add cluster failed", e, ResultStatus.MYSQL_OPERATE_FAILED); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java index 7e5fa91f..e939f00d 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/kafkauser/impl/KafkaUserServiceImpl.java @@ -7,6 +7,7 @@ import com.didiglobal.logi.log.LogFactory; import com.didiglobal.logi.security.common.dto.oplog.OplogDTO; import com.didiglobal.logi.security.util.PWEncryptUtil; import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.kafkauser.KafkaUserParam; @@ -17,11 +18,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaUserPO; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; +import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum; import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; +import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; @@ -32,7 +35,6 @@ import kafka.admin.ConfigCommand; import kafka.server.ConfigType; import kafka.zk.*; import org.apache.kafka.clients.admin.*; -import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.security.scram.ScramCredential; import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils; import org.apache.kafka.common.security.scram.internals.ScramFormatter; @@ -71,6 +73,9 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K @Autowired private OpLogWrapService opLogWrapService; + @Autowired + private ClusterPhyService clusterPhyService; + @Override protected VersionItemTypeEnum getVersionItemType() { return VersionItemTypeEnum.SERVICE_OP_KAFKA_USER; @@ -571,6 +576,18 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K private Result> getKafkaUserByKafkaClient(VersionItemParam itemParam) { KafkaUserParam param = (KafkaUserParam) itemParam; try { + // 获取集群 + ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId()); + if (clusterPhy == null) { + return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId())); + } + + // 判断认证模式,如果是非scram模式,直接返回 + if (!ClusterAuthTypeEnum.isScram(clusterPhy.getAuthType())) { + log.warn("method=getKafkaUserByKafkaClient||clusterPhyId={}||msg=not scram auth type and ignore get users", clusterPhy.getId()); + return Result.buildSuc(new ArrayList<>()); + } + AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId()); // 查询集群kafka-user