From a7309612d56be26bdcb4ad1f186bd5b5b836a130 Mon Sep 17 00:00:00 2001 From: EricZeng Date: Tue, 15 Aug 2023 18:46:41 +0800 Subject: [PATCH] =?UTF-8?q?[Optimize]=E7=BB=9F=E4=B8=80DB=E5=85=83?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E6=9B=B4=E6=96=B0=E6=A0=BC=E5=BC=8F-Part2=20?= =?UTF-8?q?(#1127)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1、KafkaMetaService修改为MetaService并移动到Core层; 2、修改ZK、KafkaACL的格式; --- .../km/core/service/acl/KafkaAclService.java | 11 +-- .../core/service/acl/OpKafkaAclService.java | 13 --- .../service/acl/impl/KafkaAclServiceImpl.java | 90 ++++++++++++------- .../acl/impl/OpKafkaAclServiceImpl.java | 37 -------- .../connect/connector/ConnectorService.java | 4 +- .../km/core/service/meta/MetaDataService.java | 19 ++-- .../service/zookeeper/ZookeeperService.java | 12 +-- .../zookeeper/impl/ZookeeperServiceImpl.java | 42 +++++---- .../task/kafka/metadata/SyncKafkaAclTask.java | 24 +---- .../kafka/metadata/SyncZookeeperTask.java | 15 +--- 10 files changed, 108 insertions(+), 159 deletions(-) rename km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java => km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/meta/MetaDataService.java (65%) diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/KafkaAclService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/KafkaAclService.java index 3e50771b..9e9735a9 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/KafkaAclService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/KafkaAclService.java @@ -1,15 +1,13 @@ package com.xiaojukeji.know.streaming.km.core.service.acl; -import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.resource.ResourceType; import java.util.List; -public interface KafkaAclService { - Result> getAclFromKafka(Long clusterPhyId); - +public interface KafkaAclService extends MetaDataService { List getKafkaAclFromDB(Long clusterPhyId); Integer countKafkaAclFromDB(Long clusterPhyId); @@ -17,10 +15,5 @@ public interface KafkaAclService { Integer countResTypeAndDistinctFromDB(Long clusterPhyId, ResourceType resourceType); Integer countKafkaUserAndDistinctFromDB(Long clusterPhyId); - - List getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType); - List getTopicAclFromDB(Long clusterPhyId, String topicName); - - List getGroupAclFromDB(Long clusterPhyId, String groupName); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/OpKafkaAclService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/OpKafkaAclService.java index 7dd59c75..f6129326 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/OpKafkaAclService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/OpKafkaAclService.java @@ -3,10 +3,6 @@ package com.xiaojukeji.know.streaming.km.core.service.acl; import com.xiaojukeji.know.streaming.km.common.bean.entity.param.acl.ACLAtomParam; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; -import org.apache.kafka.common.resource.ResourceType; - -import java.util.Date; -import java.util.List; public interface OpKafkaAclService { /** @@ -19,14 +15,5 @@ public interface OpKafkaAclService { */ Result deleteKafkaAcl(ACLAtomParam aclAtomParam, String operator); - /** - * 删除ACL - */ - Result deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator); - Result insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO); - - void batchUpdateAcls(Long clusterPhyId, List poList); - - int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java index 8f1473cd..65258044 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/KafkaAclServiceImpl.java @@ -11,6 +11,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant; import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant; +import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter; import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum; import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum; import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException; @@ -18,8 +19,6 @@ import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService; -import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; -import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO; @@ -36,11 +35,13 @@ import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.utils.SecurityUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; import org.springframework.stereotype.Service; import javax.annotation.PostConstruct; -import java.util.ArrayList; -import java.util.List; +import java.util.*; +import java.util.function.Function; +import java.util.stream.Collectors; import scala.jdk.javaapi.CollectionConverters; @@ -77,18 +78,49 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen } @Override - public Result> getAclFromKafka(Long clusterPhyId) { - if (LoadedClusterPhyCache.getByPhyId(clusterPhyId) == null) { - return Result.buildFromRSAndMsg(ResultStatus.NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId)); - } - + public Result> getDataFromKafka(ClusterPhy clusterPhy) { try { - return (Result>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhyId, ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhyId)); + Result> dataResult = (Result>) versionControlService.doHandler(getVersionItemType(), getMethodName(clusterPhy.getId(), ACL_GET_FROM_KAFKA), new ClusterPhyParam(clusterPhy.getId())); + if (dataResult.failed()) { + Result.buildFromIgnoreData(dataResult); + } + + return Result.buildSuc(dataResult.getData()); } catch (VCHandlerNotExistException e) { return Result.buildFailure(e.getResultStatus()); } } + @Override + public void writeToDB(Long clusterPhyId, List dataList) { + Map dbPOMap = this.getKafkaAclFromDB(clusterPhyId).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity())); + + long now = System.currentTimeMillis(); + for (AclBinding aclBinding: dataList) { + KafkaAclPO newPO = KafkaAclConverter.convert2KafkaAclPO(clusterPhyId, aclBinding, now); + KafkaAclPO oldPO = dbPOMap.remove(newPO.getUniqueField()); + if (oldPO == null) { + // 新增的ACL + this.insertAndIgnoreDuplicate(newPO); + } + + // 不需要update + } + + // 删除已经不存在的 + for (KafkaAclPO dbPO: dbPOMap.values()) { + kafkaAclDAO.deleteById(dbPO); + } + } + + @Override + public int deleteInDBByKafkaClusterId(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId); + + return kafkaAclDAO.delete(lambdaQueryWrapper); + } + @Override public List getKafkaAclFromDB(Long clusterPhyId) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); @@ -116,7 +148,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen return 0; } - return (int)poList.stream().map(elem -> elem.getResourceName()).distinct().count(); + return (int)poList.stream().map(KafkaAclPO::getResourceName).distinct().count(); } @Override @@ -130,15 +162,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen return 0; } - return (int)poList.stream().map(elem -> elem.getPrincipal()).distinct().count(); - } - - @Override - public List getKafkaResTypeAclFromDB(Long clusterPhyId, Integer resType) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId); - queryWrapper.eq(KafkaAclPO::getResourceType, resType); - return kafkaAclDAO.selectList(queryWrapper); + return (int)poList.stream().map(KafkaAclPO::getPrincipal).distinct().count(); } @Override @@ -152,15 +176,6 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen return kafkaAclDAO.selectList(queryWrapper); } - @Override - public List getGroupAclFromDB(Long clusterPhyId, String groupName) { - LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); - queryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId); - queryWrapper.eq(KafkaAclPO::getResourceType, ResourceType.GROUP.code()); - queryWrapper.eq(KafkaAclPO::getResourceName, groupName); - return kafkaAclDAO.selectList(queryWrapper); - } - /**************************************************** private method ****************************************************/ private Result> getAclByZKClient(VersionItemParam itemParam){ @@ -170,7 +185,7 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen for (ZkAclStore store: CollectionConverters.asJava(ZkAclStore.stores())) { Result> rl = this.getSpecifiedTypeAclByZKClient(param.getClusterPhyId(), store.patternType()); if (rl.failed()) { - return rl; + return Result.buildFromIgnoreData(rl); } aclList.addAll(rl.getData()); @@ -229,4 +244,19 @@ public class KafkaAclServiceImpl extends BaseKafkaVersionControlService implemen return Result.buildSuc(kafkaAclList); } + + private Result insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) { + try { + kafkaAclDAO.insert(kafkaAclPO); + + return Result.buildSuc(); + } catch (DuplicateKeyException dke) { + // 直接写入,如果出现key冲突则直接忽略,因为key冲突时,表示该数据已完整存在,不需要替换任何数据 + return Result.buildSuc(); + } catch (Exception e) { + log.error("method=insertAndIgnoreDuplicate||kafkaAclPO={}||errMsg=exception", kafkaAclPO, e); + + return Result.buildFromRSAndMsg(ResultStatus.MYSQL_OPERATE_FAILED, e.getMessage()); + } + } } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/OpKafkaAclServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/OpKafkaAclServiceImpl.java index a8fab1f1..c3915cde 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/OpKafkaAclServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/acl/impl/OpKafkaAclServiceImpl.java @@ -20,7 +20,6 @@ import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistExcept import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService; import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService; import com.xiaojukeji.know.streaming.km.core.service.version.BaseKafkaVersionControlService; -import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient; import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient; import com.xiaojukeji.know.streaming.km.persistence.mysql.KafkaAclDAO; @@ -32,7 +31,6 @@ import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.acl.*; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourcePatternFilter; -import org.apache.kafka.common.resource.ResourceType; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.dao.DuplicateKeyException; @@ -41,8 +39,6 @@ import scala.jdk.javaapi.CollectionConverters; import javax.annotation.PostConstruct; import java.util.*; -import java.util.function.Function; -import java.util.stream.Collectors; import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*; @@ -169,11 +165,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem return rv; } - @Override - public Result deleteKafkaAclByResName(ResourceType resourceType, String resourceName, String operator) { - return Result.buildSuc(); - } - @Override public Result insertAndIgnoreDuplicate(KafkaAclPO kafkaAclPO) { try { @@ -190,34 +181,6 @@ public class OpKafkaAclServiceImpl extends BaseKafkaVersionControlService implem } } - @Override - public void batchUpdateAcls(Long clusterPhyId, List poList) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId); - - Map dbPOMap = kafkaAclDAO.selectList(lambdaQueryWrapper).stream().collect(Collectors.toMap(KafkaAclPO::getUniqueField, Function.identity())); - for (KafkaAclPO po: poList) { - KafkaAclPO dbPO = dbPOMap.remove(po.getUniqueField()); - if (dbPO == null) { - // 新增的ACL - this.insertAndIgnoreDuplicate(po); - } - } - - // 删除已经不存在的 - for (KafkaAclPO dbPO: dbPOMap.values()) { - kafkaAclDAO.deleteById(dbPO); - } - } - - @Override - public int deleteByUpdateTimeBeforeInDB(Long clusterPhyId, Date beforeTime) { - LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); - lambdaQueryWrapper.eq(KafkaAclPO::getClusterPhyId, clusterPhyId); - lambdaQueryWrapper.le(KafkaAclPO::getUpdateTime, beforeTime); - return kafkaAclDAO.delete(lambdaQueryWrapper); - } - /**************************************************** private method ****************************************************/ private Result deleteInDB(KafkaAclPO kafkaAclPO) { diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java index 7a85ffd2..05fe2cf9 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/connect/connector/ConnectorService.java @@ -4,7 +4,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluste import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnector; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorInfo; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.connector.KSConnectorStateInfo; -import com.xiaojukeji.know.streaming.km.common.bean.entity.meta.KafkaMetaService; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.po.connect.ConnectorPO; import com.xiaojukeji.know.streaming.km.common.enums.connect.ConnectorTypeEnum; @@ -14,7 +14,7 @@ import java.util.List; /** * 查看Connector */ -public interface ConnectorService extends KafkaMetaService { +public interface ConnectorService extends MetaDataService { /** * 获取所有的连接器名称列表 */ diff --git a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/meta/MetaDataService.java similarity index 65% rename from km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java rename to km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/meta/MetaDataService.java index d0307afc..b1c34dbf 100644 --- a/km-common/src/main/java/com/xiaojukeji/know/streaming/km/common/bean/entity/meta/KafkaMetaService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/meta/MetaDataService.java @@ -1,4 +1,4 @@ -package com.xiaojukeji.know.streaming.km.common.bean.entity.meta; +package com.xiaojukeji.know.streaming.km.core.service.meta; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.connect.ConnectCluster; @@ -13,7 +13,7 @@ import java.util.Set; /** * Kafka元信息服务接口 */ -public interface KafkaMetaService { +public interface MetaDataService { /** * 从Kafka中获取数据 * @param connectCluster connect集群 @@ -26,19 +26,26 @@ public interface KafkaMetaService { * @param clusterPhy kafka集群 * @return 全部资源集合, 成功的资源列表 */ - default Result, List>> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new Tuple<>(new HashSet<>(), new ArrayList<>())); } + default Result> getDataFromKafka(ClusterPhy clusterPhy) { return Result.buildSuc(new ArrayList<>()); } /** * 元信息同步至DB中 * @param clusterId 集群ID - * @param fullNameSet 全部资源列表 + * @param fullResSet 全部资源列表 * @param dataList 成功的资源列表 */ - default void writeToDB(Long clusterId, Set fullNameSet, List dataList) {} + default void writeToDB(Long clusterId, Set fullResSet, List dataList) {} + + /** + * 元信息同步至DB中 + * @param clusterId 集群ID + * @param dataList 成功的资源列表 + */ + default void writeToDB(Long clusterId, List dataList) {} /** * 依据kafka集群ID删除数据 * @param clusterPhyId kafka集群ID */ - default int deleteInDBByKafkaClusterId(Long clusterPhyId) { return 0; } + int deleteInDBByKafkaClusterId(Long clusterPhyId); } diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java index 8d3a78b1..1d324928 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/ZookeeperService.java @@ -1,19 +1,11 @@ package com.xiaojukeji.know.streaming.km.core.service.zookeeper; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; -import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; +import com.xiaojukeji.know.streaming.km.core.service.meta.MetaDataService; import java.util.List; -public interface ZookeeperService { - /** - * 从ZK集群中获取ZK信息 - */ - Result> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig); - - void batchReplaceDataInDB(Long clusterPhyId, List infoList); - +public interface ZookeeperService extends MetaDataService { List listFromDBByCluster(Long clusterPhyId); /** diff --git a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java index 8b0d63d1..dc2f58d2 100644 --- a/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java +++ b/km-core/src/main/java/com/xiaojukeji/know/streaming/km/core/service/zookeeper/impl/ZookeeperServiceImpl.java @@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.zookeeper.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.didiglobal.logi.log.ILog; import com.didiglobal.logi.log.LogFactory; +import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus; @@ -22,10 +23,8 @@ import com.xiaojukeji.know.streaming.km.persistence.mysql.zookeeper.ZookeeperDAO import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; +import java.util.stream.Collectors; @Service public class ZookeeperServiceImpl implements ZookeeperService { @@ -35,14 +34,14 @@ public class ZookeeperServiceImpl implements ZookeeperService { private ZookeeperDAO zookeeperDAO; @Override - public Result> listFromZookeeper(Long clusterPhyId, String zookeeperAddress, ZKConfig zkConfig) { + public Result> getDataFromKafka(ClusterPhy clusterPhy) { List> addressList = null; try { - addressList = ZookeeperUtils.connectStringParser(zookeeperAddress); + addressList = ZookeeperUtils.connectStringParser(clusterPhy.getZookeeper()); } catch (Exception e) { LOGGER.error( - "method=listFromZookeeperCluster||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!", - clusterPhyId, zookeeperAddress, e + "method=getDataFromKafka||clusterPhyId={}||zookeeperAddress={}||errMsg=exception!", + clusterPhy.getId(), clusterPhy.getZookeeper(), e ); return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, e.getMessage()); @@ -51,24 +50,25 @@ public class ZookeeperServiceImpl implements ZookeeperService { List aliveZKList = new ArrayList<>(); for (Tuple hostPort: addressList) { aliveZKList.add(this.getFromZookeeperCluster( - clusterPhyId, + clusterPhy.getId(), hostPort.getV1(), hostPort.getV2(), - zkConfig + ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) )); } + return Result.buildSuc(aliveZKList); } @Override - public void batchReplaceDataInDB(Long clusterPhyId, List infoList) { + public void writeToDB(Long clusterId, List dataList) { // DB 中的信息 - List dbInfoList = this.listRawFromDBByCluster(clusterPhyId); - Map dbMap = new HashMap<>(); - dbInfoList.stream().forEach(elem -> dbMap.put(elem.getHost() + elem.getPort(), elem)); + Map dbMap = this.listRawFromDBByCluster(clusterId) + .stream() + .collect(Collectors.toMap(elem -> elem.getHost() + elem.getPort(), elem -> elem, (oldValue, newValue) -> newValue)); // 新获取到的信息 - List newInfoList = ConvertUtil.list2List(infoList, ZookeeperInfoPO.class); + List newInfoList = ConvertUtil.list2List(dataList, ZookeeperInfoPO.class); for (ZookeeperInfoPO newInfo: newInfoList) { try { ZookeeperInfoPO oldInfo = dbMap.remove(newInfo.getHost() + newInfo.getPort()); @@ -87,7 +87,7 @@ public class ZookeeperServiceImpl implements ZookeeperService { zookeeperDAO.updateById(newInfo); } } catch (Exception e) { - LOGGER.error("method=batchReplaceDataInDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterPhyId, newInfo, e); + LOGGER.error("method=writeToDB||clusterPhyId={}||newInfo={}||errMsg=exception", clusterId, newInfo, e); } } @@ -96,11 +96,19 @@ public class ZookeeperServiceImpl implements ZookeeperService { try { zookeeperDAO.deleteById(entry.getValue().getId()); } catch (Exception e) { - LOGGER.error("method=batchReplaceDataInDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterPhyId, entry.getValue(), e); + LOGGER.error("method=writeToDB||clusterPhyId={}||expiredInfo={}||errMsg=exception", clusterId, entry.getValue(), e); } }); } + @Override + public int deleteInDBByKafkaClusterId(Long clusterPhyId) { + LambdaQueryWrapper lambdaQueryWrapper = new LambdaQueryWrapper<>(); + lambdaQueryWrapper.eq(ZookeeperInfoPO::getClusterPhyId, clusterPhyId); + + return zookeeperDAO.delete(lambdaQueryWrapper); + } + @Override public List listFromDBByCluster(Long clusterPhyId) { return ConvertUtil.list2List(this.listRawFromDBByCluster(clusterPhyId), ZookeeperInfo.class); diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaAclTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaAclTask.java index 0dd5b8f7..d0b4f3ba 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaAclTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncKafkaAclTask.java @@ -3,19 +3,13 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metadata; import com.didiglobal.logi.job.annotation.Task; import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; -import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO; -import com.xiaojukeji.know.streaming.km.common.converter.KafkaAclConverter; import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService; -import com.xiaojukeji.know.streaming.km.core.service.acl.OpKafkaAclService; import org.apache.kafka.common.acl.AclBinding; import org.springframework.beans.factory.annotation.Autowired; import java.util.List; -import java.util.stream.Collectors; @Task(name = "SyncKafkaAclTask", description = "KafkaAcl信息同步到DB", @@ -24,32 +18,18 @@ import java.util.stream.Collectors; consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) public class SyncKafkaAclTask extends AbstractAsyncMetadataDispatchTask { - private static final ILog log = LogFactory.getLog(SyncKafkaAclTask.class); - @Autowired private KafkaAclService kafkaAclService; - @Autowired - private OpKafkaAclService opKafkaAclService; - @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - Result> aclBindingListResult = kafkaAclService.getAclFromKafka(clusterPhy.getId()); + Result> aclBindingListResult = kafkaAclService.getDataFromKafka(clusterPhy); if (aclBindingListResult.failed()) { return TaskResult.FAIL; } - if (!aclBindingListResult.hasData()) { - return TaskResult.SUCCESS; - } + kafkaAclService.writeToDB(clusterPhy.getId(), aclBindingListResult.getData()); - // 更新DB数据 - List poList = aclBindingListResult.getData() - .stream() - .map(elem -> KafkaAclConverter.convert2KafkaAclPO(clusterPhy.getId(), elem, triggerTimeUnitMs)) - .collect(Collectors.toList()); - - opKafkaAclService.batchUpdateAcls(clusterPhy.getId(), poList); return TaskResult.SUCCESS; } } diff --git a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncZookeeperTask.java b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncZookeeperTask.java index e87f2bf1..4ce988d3 100644 --- a/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncZookeeperTask.java +++ b/km-task/src/main/java/com/xiaojukeji/know/streaming/km/task/kafka/metadata/SyncZookeeperTask.java @@ -3,12 +3,8 @@ package com.xiaojukeji.know.streaming.km.task.kafka.metadata; import com.didiglobal.logi.job.annotation.Task; import com.didiglobal.logi.job.common.TaskResult; import com.didiglobal.logi.job.core.consensual.ConsensualEnum; -import com.didiglobal.logi.log.ILog; -import com.didiglobal.logi.log.LogFactory; import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy; -import com.xiaojukeji.know.streaming.km.common.bean.entity.config.ZKConfig; import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result; -import com.xiaojukeji.know.streaming.km.common.utils.ConvertUtil; import com.xiaojukeji.know.streaming.km.common.bean.entity.zookeeper.ZookeeperInfo; import com.xiaojukeji.know.streaming.km.core.service.zookeeper.ZookeeperService; import org.springframework.beans.factory.annotation.Autowired; @@ -23,24 +19,17 @@ import java.util.List; consensual = ConsensualEnum.BROADCAST, timeout = 2 * 60) public class SyncZookeeperTask extends AbstractAsyncMetadataDispatchTask { - private static final ILog log = LogFactory.getLog(SyncZookeeperTask.class); - @Autowired private ZookeeperService zookeeperService; @Override public TaskResult processClusterTask(ClusterPhy clusterPhy, long triggerTimeUnitMs) { - Result> infoResult = zookeeperService.listFromZookeeper( - clusterPhy.getId(), - clusterPhy.getZookeeper(), - ConvertUtil.str2ObjByJson(clusterPhy.getZkProperties(), ZKConfig.class) - ); - + Result> infoResult = zookeeperService.getDataFromKafka(clusterPhy); if (infoResult.failed()) { return new TaskResult(TaskResult.FAIL_CODE, infoResult.getMessage()); } - zookeeperService.batchReplaceDataInDB(clusterPhy.getId(), infoResult.getData()); + zookeeperService.writeToDB(clusterPhy.getId(), infoResult.getData()); return TaskResult.SUCCESS; }