mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
日志优化
This commit is contained in:
@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.acl.impl;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
|
||||
@@ -10,10 +11,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
|
||||
@@ -58,6 +61,9 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka
|
||||
@Autowired
|
||||
private KafkaAdminZKClient kafkaAdminZKClient;
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return VersionItemTypeEnum.SERVICE_OP_ACL;
|
||||
@@ -175,6 +181,18 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka
|
||||
private Result<List<AclBinding>> getAclByKafkaClient(VersionItemParam itemParam) {
|
||||
ClusterPhyParam param = (ClusterPhyParam) itemParam;
|
||||
try {
|
||||
// 获取集群
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId());
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
// 判断是否开启认证
|
||||
if (!ClusterAuthTypeEnum.enableAuth(clusterPhy.getAuthType())) {
|
||||
log.warn("method=getAclByKafkaClient||clusterPhyId={}||msg=not open auth and ignore get acls", clusterPhy.getId());
|
||||
return Result.buildSuc(new ArrayList<>());
|
||||
}
|
||||
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
|
||||
|
||||
DescribeAclsResult describeAclsResult =
|
||||
|
||||
@@ -5,14 +5,19 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.didiglobal.logi.log.ILog;
|
||||
import com.didiglobal.logi.log.LogFactory;
|
||||
import com.github.benmanes.caffeine.cache.Cache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
|
||||
import com.xiaojukeji.know.streaming.km.persistence.mysql.changerecord.KafkaChangeRecordDAO;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
|
||||
@Service
|
||||
public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
|
||||
private static final ILog log = LogFactory.getLog(KafkaChangeRecordServiceImpl.class);
|
||||
@@ -20,11 +25,24 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
|
||||
@Autowired
|
||||
private KafkaChangeRecordDAO kafkaChangeRecordDAO;
|
||||
|
||||
private static final Cache<String, String> recordCache = Caffeine.newBuilder()
|
||||
.expireAfterWrite(12, TimeUnit.HOURS)
|
||||
.maximumSize(1000)
|
||||
.build();
|
||||
|
||||
@Override
|
||||
public int insertAndIgnoreDuplicate(KafkaChangeRecordPO recordPO) {
|
||||
try {
|
||||
String cacheData = recordCache.getIfPresent(recordPO.getUniqueField());
|
||||
if (cacheData != null || this.checkExistInDB(recordPO.getUniqueField())) {
|
||||
// 已存在时,则直接返回
|
||||
return 0;
|
||||
}
|
||||
|
||||
recordCache.put(recordPO.getUniqueField(), recordPO.getUniqueField());
|
||||
|
||||
return kafkaChangeRecordDAO.insert(recordPO);
|
||||
} catch (DuplicateKeyException dke) {
|
||||
} catch (Exception e) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
@@ -40,4 +58,12 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
|
||||
|
||||
/**************************************************** private method ****************************************************/
|
||||
|
||||
private boolean checkExistInDB(String uniqueField) {
|
||||
LambdaQueryWrapper<KafkaChangeRecordPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
|
||||
lambdaQueryWrapper.eq(KafkaChangeRecordPO::getUniqueField, uniqueField);
|
||||
|
||||
List<KafkaChangeRecordPO> poList = kafkaChangeRecordDAO.selectList(lambdaQueryWrapper);
|
||||
|
||||
return poList != null && !poList.isEmpty();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -111,7 +111,7 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
|
||||
|
||||
throw new DuplicateException(String.format("clusterName:%s duplicated", clusterPhyPO.getName()));
|
||||
} catch (Exception e) {
|
||||
log.error("cmethod=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e);
|
||||
log.error("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e);
|
||||
|
||||
throw new AdminOperateException("add cluster failed", e, ResultStatus.MYSQL_OPERATE_FAILED);
|
||||
}
|
||||
|
||||
@@ -7,6 +7,7 @@ import com.didiglobal.logi.log.LogFactory;
|
||||
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
|
||||
import com.didiglobal.logi.security.util.PWEncryptUtil;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.kafkauser.KafkaUserParam;
|
||||
@@ -17,11 +18,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
|
||||
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaUserPO;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
|
||||
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
|
||||
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
|
||||
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
|
||||
@@ -32,7 +35,6 @@ import kafka.admin.ConfigCommand;
|
||||
import kafka.server.ConfigType;
|
||||
import kafka.zk.*;
|
||||
import org.apache.kafka.clients.admin.*;
|
||||
import org.apache.kafka.common.errors.UnsupportedVersionException;
|
||||
import org.apache.kafka.common.security.scram.ScramCredential;
|
||||
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
|
||||
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
|
||||
@@ -71,6 +73,9 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K
|
||||
@Autowired
|
||||
private OpLogWrapService opLogWrapService;
|
||||
|
||||
@Autowired
|
||||
private ClusterPhyService clusterPhyService;
|
||||
|
||||
@Override
|
||||
protected VersionItemTypeEnum getVersionItemType() {
|
||||
return VersionItemTypeEnum.SERVICE_OP_KAFKA_USER;
|
||||
@@ -571,6 +576,18 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K
|
||||
private Result<List<KafkaUser>> getKafkaUserByKafkaClient(VersionItemParam itemParam) {
|
||||
KafkaUserParam param = (KafkaUserParam) itemParam;
|
||||
try {
|
||||
// 获取集群
|
||||
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId());
|
||||
if (clusterPhy == null) {
|
||||
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId()));
|
||||
}
|
||||
|
||||
// 判断认证模式,如果是非scram模式,直接返回
|
||||
if (!ClusterAuthTypeEnum.isScram(clusterPhy.getAuthType())) {
|
||||
log.warn("method=getKafkaUserByKafkaClient||clusterPhyId={}||msg=not scram auth type and ignore get users", clusterPhy.getId());
|
||||
return Result.buildSuc(new ArrayList<>());
|
||||
}
|
||||
|
||||
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
|
||||
|
||||
// 查询集群kafka-user
|
||||
|
||||
Reference in New Issue
Block a user