Merge pull request #520 from didi/dev_v3.0.0

1、删除无效字段;2、日志优化;
This commit is contained in:
EricZeng
2022-08-29 20:33:55 +08:00
committed by GitHub
5 changed files with 65 additions and 5 deletions

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.acl.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
@@ -10,10 +11,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
@@ -58,6 +61,9 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka
@Autowired
private KafkaAdminZKClient kafkaAdminZKClient;
@Autowired
private ClusterPhyService clusterPhyService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.SERVICE_OP_ACL;
@@ -175,6 +181,18 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka
private Result<List<AclBinding>> getAclByKafkaClient(VersionItemParam itemParam) {
ClusterPhyParam param = (ClusterPhyParam) itemParam;
try {
// 获取集群
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId()));
}
// 判断是否开启认证
if (!ClusterAuthTypeEnum.enableAuth(clusterPhy.getAuthType())) {
log.warn("method=getAclByKafkaClient||clusterPhyId={}||msg=not open auth and ignore get acls", clusterPhy.getId());
return Result.buildSuc(new ArrayList<>());
}
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
DescribeAclsResult describeAclsResult =

View File

@@ -5,14 +5,19 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO;
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
import com.xiaojukeji.know.streaming.km.persistence.mysql.changerecord.KafkaChangeRecordDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
private static final ILog log = LogFactory.getLog(KafkaChangeRecordServiceImpl.class);
@@ -20,11 +25,24 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
@Autowired
private KafkaChangeRecordDAO kafkaChangeRecordDAO;
private static final Cache<String, String> recordCache = Caffeine.newBuilder()
.expireAfterWrite(12, TimeUnit.HOURS)
.maximumSize(1000)
.build();
@Override
public int insertAndIgnoreDuplicate(KafkaChangeRecordPO recordPO) {
try {
String cacheData = recordCache.getIfPresent(recordPO.getUniqueField());
if (cacheData != null || this.checkExistInDB(recordPO.getUniqueField())) {
// 已存在时,则直接返回
return 0;
}
recordCache.put(recordPO.getUniqueField(), recordPO.getUniqueField());
return kafkaChangeRecordDAO.insert(recordPO);
} catch (DuplicateKeyException dke) {
} catch (Exception e) {
return 0;
}
}
@@ -40,4 +58,12 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
/**************************************************** private method ****************************************************/
private boolean checkExistInDB(String uniqueField) {
LambdaQueryWrapper<KafkaChangeRecordPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaChangeRecordPO::getUniqueField, uniqueField);
List<KafkaChangeRecordPO> poList = kafkaChangeRecordDAO.selectList(lambdaQueryWrapper);
return poList != null && !poList.isEmpty();
}
}

View File

@@ -111,7 +111,7 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
throw new DuplicateException(String.format("clusterName:%s duplicated", clusterPhyPO.getName()));
} catch (Exception e) {
log.error("cmethod=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e);
log.error("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e);
throw new AdminOperateException("add cluster failed", e, ResultStatus.MYSQL_OPERATE_FAILED);
}

View File

@@ -7,6 +7,7 @@ import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.didiglobal.logi.security.util.PWEncryptUtil;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.kafkauser.KafkaUserParam;
@@ -17,11 +18,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaUserPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
@@ -32,7 +35,6 @@ import kafka.admin.ConfigCommand;
import kafka.server.ConfigType;
import kafka.zk.*;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
@@ -71,6 +73,9 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K
@Autowired
private OpLogWrapService opLogWrapService;
@Autowired
private ClusterPhyService clusterPhyService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.SERVICE_OP_KAFKA_USER;
@@ -571,6 +576,18 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K
private Result<List<KafkaUser>> getKafkaUserByKafkaClient(VersionItemParam itemParam) {
KafkaUserParam param = (KafkaUserParam) itemParam;
try {
// 获取集群
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId()));
}
// 判断认证模式如果是非scram模式直接返回
if (!ClusterAuthTypeEnum.isScram(clusterPhy.getAuthType())) {
log.warn("method=getKafkaUserByKafkaClient||clusterPhyId={}||msg=not scram auth type and ignore get users", clusterPhy.getId());
return Result.buildSuc(new ArrayList<>());
}
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
// 查询集群kafka-user

View File

@@ -51,7 +51,6 @@ CREATE TABLE `ks_km_cluster_balance_job` (
`total_reassign_size` double NOT NULL DEFAULT '0' COMMENT '总迁移大小',
`total_reassign_replica_num` int(16) NOT NULL DEFAULT '0' COMMENT '总迁移副本数',
`move_in_topic_list` varchar(4096) NOT NULL DEFAULT '' COMMENT '移入topic',
`move_broker_list` varchar(1024) NOT NULL DEFAULT '' COMMENT '移除节点',
`broker_balance_detail` text COMMENT '节点均衡详情',
`status` int(16) NOT NULL DEFAULT '0' COMMENT '任务状态 1进行中2准备3成功4失败5取消',
`creator` varchar(64) NOT NULL DEFAULT '' COMMENT '操作人',