Merge pull request #536 from didi/master

合并主分支
This commit is contained in:
EricZeng
2022-09-01 17:02:32 +08:00
committed by GitHub
43 changed files with 499 additions and 164 deletions

View File

@@ -10,13 +10,13 @@ import java.util.concurrent.TimeUnit;
public class CollectedMetricsLocalCache {
private static final Cache<String, Float> brokerMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(60, TimeUnit.SECONDS)
.maximumSize(2000)
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(10000)
.build();
private static final Cache<String, List<TopicMetrics>> topicMetricsCache = Caffeine.newBuilder()
.expireAfterWrite(90, TimeUnit.SECONDS)
.maximumSize(5000)
.maximumSize(10000)
.build();
private static final Cache<String, List<PartitionMetrics>> partitionMetricsCache = Caffeine.newBuilder()
@@ -29,63 +29,64 @@ public class CollectedMetricsLocalCache {
.maximumSize(20000)
.build();
public static Float getBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName) {
return brokerMetricsCache.getIfPresent(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName));
public static Float getBrokerMetrics(String brokerMetricKey) {
return brokerMetricsCache.getIfPresent(brokerMetricKey);
}
public static void putBrokerMetrics(Long clusterPhyId, Integer brokerId, String metricName, Float value) {
public static void putBrokerMetrics(String brokerMetricKey, Float value) {
if (value == null) {
return;
}
brokerMetricsCache.put(CollectedMetricsLocalCache.genBrokerMetricKey(clusterPhyId, brokerId, metricName), value);
brokerMetricsCache.put(brokerMetricKey, value);
}
public static List<TopicMetrics> getTopicMetrics(Long clusterPhyId, String topicName, String metricName) {
return topicMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
public static List<TopicMetrics> getTopicMetrics(String topicMetricKey) {
return topicMetricsCache.getIfPresent(topicMetricKey);
}
public static void putTopicMetrics(Long clusterPhyId, String topicName, String metricName, List<TopicMetrics> metricsList) {
public static void putTopicMetrics(String topicMetricKey, List<TopicMetrics> metricsList) {
if (metricsList == null) {
return;
}
topicMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
topicMetricsCache.put(topicMetricKey, metricsList);
}
public static List<PartitionMetrics> getPartitionMetricsList(Long clusterPhyId, String topicName, String metricName) {
return partitionMetricsCache.getIfPresent(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName));
public static List<PartitionMetrics> getPartitionMetricsList(String partitionMetricKey) {
return partitionMetricsCache.getIfPresent(partitionMetricKey);
}
public static void putPartitionMetricsList(Long clusterPhyId, String topicName, String metricName, List<PartitionMetrics> metricsList) {
public static void putPartitionMetricsList(String partitionMetricsKey, List<PartitionMetrics> metricsList) {
if (metricsList == null) {
return;
}
partitionMetricsCache.put(CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName), metricsList);
partitionMetricsCache.put(partitionMetricsKey, metricsList);
}
public static Float getReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
return replicaMetricsValueCache.getIfPresent(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName));
public static Float getReplicaMetrics(String replicaMetricsKey) {
return replicaMetricsValueCache.getIfPresent(replicaMetricsKey);
}
public static void putReplicaMetrics(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName, Float value) {
public static void putReplicaMetrics(String replicaMetricsKey, Float value) {
if (value == null) {
return;
}
replicaMetricsValueCache.put(CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topicName, partitionId, metricName), value);
replicaMetricsValueCache.put(replicaMetricsKey, value);
}
/**************************************************** private method ****************************************************/
private static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
public static String genBrokerMetricKey(Long clusterPhyId, Integer brokerId, String metricName) {
return clusterPhyId + "@" + brokerId + "@" + metricName;
}
private static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
public static String genClusterTopicMetricKey(Long clusterPhyId, String topicName, String metricName) {
return clusterPhyId + "@" + topicName + "@" + metricName;
}
private static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
public static String genReplicaMetricCacheKey(Long clusterPhyId, Integer brokerId, String topicName, Integer partitionId, String metricName) {
return clusterPhyId + "@" + brokerId + "@" + topicName + "@" + partitionId + "@" + metricName;
}
/**************************************************** private method ****************************************************/
}

View File

@@ -3,6 +3,7 @@ package com.xiaojukeji.know.streaming.km.core.service.acl.impl;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.cluster.ClusterPhyParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
@@ -10,10 +11,12 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaAclPO;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.acl.KafkaAclService;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.cache.LoadedClusterPhyCache;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
@@ -58,6 +61,9 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka
@Autowired
private KafkaAdminZKClient kafkaAdminZKClient;
@Autowired
private ClusterPhyService clusterPhyService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.SERVICE_OP_ACL;
@@ -175,6 +181,18 @@ public class KafkaAclServiceImpl extends BaseVersionControlService implements Ka
private Result<List<AclBinding>> getAclByKafkaClient(VersionItemParam itemParam) {
ClusterPhyParam param = (ClusterPhyParam) itemParam;
try {
// 获取集群
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId()));
}
// 判断是否开启认证
if (!ClusterAuthTypeEnum.enableAuth(clusterPhy.getAuthType())) {
log.warn("method=getAclByKafkaClient||clusterPhyId={}||msg=not open auth and ignore get acls", clusterPhy.getId());
return Result.buildSuc(new ArrayList<>());
}
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
DescribeAclsResult describeAclsResult =

View File

@@ -44,6 +44,7 @@ public interface BrokerService {
* 获取具体Broker
*/
Broker getBroker(Long clusterPhyId, Integer brokerId);
Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId);
/**
* 获取BrokerLog-Dir信息

View File

@@ -110,9 +110,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
}
@Override
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric){
public Result<BrokerMetrics> collectBrokerMetricsFromKafkaWithCacheFirst(Long clusterId, Integer brokerId, String metric) {
String brokerMetricKey = CollectedMetricsLocalCache.genBrokerMetricKey(clusterId, brokerId, metric);
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(clusterId, brokerId, metric);
Float keyValue = CollectedMetricsLocalCache.getBrokerMetrics(brokerMetricKey);
if(null != keyValue) {
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
brokerMetrics.putMetric(metric, keyValue);
@@ -124,7 +125,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
Map<String, Float> metricsMap = ret.getData().getMetrics();
for(Map.Entry<String, Float> metricNameAndValueEntry : metricsMap.entrySet()){
CollectedMetricsLocalCache.putBrokerMetrics(clusterId, brokerId, metricNameAndValueEntry.getKey(), metricNameAndValueEntry.getValue());
CollectedMetricsLocalCache.putBrokerMetrics(brokerMetricKey, metricNameAndValueEntry.getValue());
}
return ret;
@@ -178,11 +179,16 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
@Override
public Result<List<MetricPointVO>> getMetricPointsFromES(Long clusterPhyId, Integer brokerId, MetricDTO dto) {
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(clusterPhyId, brokerId,
dto.getMetricsNames(), dto.getAggType(), dto.getStartTime(), dto.getEndTime());
Map<String/*metric*/, MetricPointVO> metricPointMap = brokerMetricESDAO.getBrokerMetricsPoint(
clusterPhyId,
brokerId,
dto.getMetricsNames(),
dto.getAggType(),
dto.getStartTime(),
dto.getEndTime()
);
List<MetricPointVO> metricPoints = new ArrayList<>(metricPointMap.values());
return Result.buildSuc(metricPoints);
return Result.buildSuc(new ArrayList<>(metricPointMap.values()));
}
@Override
@@ -199,8 +205,10 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
brokerMetrics.add(ConvertUtil.obj2Obj(brokerMetricPO, BrokerMetrics.class));
} catch (Exception e) {
LOGGER.error("method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
clusterPhyId, brokerId, e);
LOGGER.error(
"method=getLatestMetricsFromES||clusterPhyId={}||brokerId={}||errMsg=exception",
clusterPhyId, brokerId, e
);
}
}
@@ -219,6 +227,7 @@ public class BrokerMetricServiceImpl extends BaseMetricService implements Broker
}
/**************************************************** private method ****************************************************/
private List<Long> listTopNBrokerIds(Long clusterId, Integer topN){
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
if(CollectionUtils.isEmpty(brokers)){return new ArrayList<>();}

View File

@@ -206,6 +206,22 @@ public class BrokerServiceImpl extends BaseVersionControlService implements Brok
return ConvertUtil.obj2Obj(brokerDAO.selectOne(lambdaQueryWrapper), Broker.class);
}
@Override
public Broker getBrokerFromCacheFirst(Long clusterPhyId, Integer brokerId) {
List<Broker> brokerList = this.listAliveBrokersFromCacheFirst(clusterPhyId);
if (brokerList == null) {
return null;
}
for (Broker broker: brokerList) {
if (brokerId.equals(broker.getBrokerId())) {
return broker;
}
}
return null;
}
@Override
public Result<Map<String, LogDirDescription>> getBrokerLogDirDescFromKafka(Long clusterPhyId, Integer brokerId) {
try {

View File

@@ -5,14 +5,19 @@ import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.po.changerecord.KafkaChangeRecordPO;
import com.xiaojukeji.know.streaming.km.core.service.change.record.KafkaChangeRecordService;
import com.xiaojukeji.know.streaming.km.persistence.mysql.changerecord.KafkaChangeRecordDAO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Service
public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
private static final ILog log = LogFactory.getLog(KafkaChangeRecordServiceImpl.class);
@@ -20,11 +25,24 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
@Autowired
private KafkaChangeRecordDAO kafkaChangeRecordDAO;
private static final Cache<String, String> recordCache = Caffeine.newBuilder()
.expireAfterWrite(12, TimeUnit.HOURS)
.maximumSize(1000)
.build();
@Override
public int insertAndIgnoreDuplicate(KafkaChangeRecordPO recordPO) {
try {
String cacheData = recordCache.getIfPresent(recordPO.getUniqueField());
if (cacheData != null || this.checkExistInDB(recordPO.getUniqueField())) {
// 已存在时,则直接返回
return 0;
}
recordCache.put(recordPO.getUniqueField(), recordPO.getUniqueField());
return kafkaChangeRecordDAO.insert(recordPO);
} catch (DuplicateKeyException dke) {
} catch (Exception e) {
return 0;
}
}
@@ -40,4 +58,12 @@ public class KafkaChangeRecordServiceImpl implements KafkaChangeRecordService {
/**************************************************** private method ****************************************************/
private boolean checkExistInDB(String uniqueField) {
LambdaQueryWrapper<KafkaChangeRecordPO> lambdaQueryWrapper = new LambdaQueryWrapper<>();
lambdaQueryWrapper.eq(KafkaChangeRecordPO::getUniqueField, uniqueField);
List<KafkaChangeRecordPO> poList = kafkaChangeRecordDAO.selectList(lambdaQueryWrapper);
return poList != null && !poList.isEmpty();
}
}

View File

@@ -73,5 +73,5 @@ public interface ClusterPhyService {
* 获取系统已存在的kafka版本列表
* @return
*/
Set<String> getClusterVersionSet();
List<String> getClusterVersionList();
}

View File

@@ -126,7 +126,7 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
private TopicMetricService topicMetricService;
@Autowired
private TopicService topicService;
private TopicService topicService;
@Autowired
private PartitionService partitionService;
@@ -728,13 +728,10 @@ public class ClusterMetricServiceImpl extends BaseMetricService implements Clust
Long clusterId = param.getClusterId();
//1、获取jmx的属性信息
VersionJmxInfo jmxInfo = getJMXInfo(clusterId, metric);
if(null == jmxInfo){return Result.buildFailure(VC_ITEM_JMX_NOT_EXIST);}
List<Broker> brokers = brokerService.listAliveBrokersFromDB(clusterId);
float metricVale = 0f;
for(Broker broker : brokers){
for(Broker broker : brokers) {
Result<BrokerMetrics> ret = brokerMetricService.collectBrokerMetricsFromKafkaWithCacheFirst(clusterId, broker.getBrokerId(), metric);
if(null == ret || ret.failed() || null == ret.getData()){continue;}

View File

@@ -24,8 +24,9 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -111,7 +112,7 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
throw new DuplicateException(String.format("clusterName:%s duplicated", clusterPhyPO.getName()));
} catch (Exception e) {
log.error("cmethod=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e);
log.error("method=addClusterPhy||clusterPhyId={}||operator={}||msg=add cluster failed||errMsg=exception!", clusterPhyPO.getId(), operator, e);
throw new AdminOperateException("add cluster failed", e, ResultStatus.MYSQL_OPERATE_FAILED);
}
@@ -205,9 +206,12 @@ public class ClusterPhyServiceImpl implements ClusterPhyService {
}
@Override
public Set<String> getClusterVersionSet() {
List<ClusterPhy> clusterPhyList = listAllClusters();
Set<String> versionSet = clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet());
return versionSet;
public List<String> getClusterVersionList() {
List<ClusterPhy> clusterPhyList = this.listAllClusters();
List<String> versionList = new ArrayList<>(clusterPhyList.stream().map(elem -> elem.getKafkaVersion()).collect(Collectors.toSet()));
Collections.sort(versionList);
return versionList;
}
}

View File

@@ -56,7 +56,7 @@ public class KafkaControllerServiceImpl implements KafkaControllerService {
@Override
public int insertAndIgnoreDuplicateException(KafkaController kafkaController) {
try {
Broker broker = brokerService.getBroker(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
Broker broker = brokerService.getBrokerFromCacheFirst(kafkaController.getClusterPhyId(), kafkaController.getBrokerId());
KafkaControllerPO kafkaControllerPO = new KafkaControllerPO();
kafkaControllerPO.setClusterPhyId(kafkaController.getClusterPhyId());
@@ -136,34 +136,56 @@ public class KafkaControllerServiceImpl implements KafkaControllerService {
/**************************************************** private method ****************************************************/
private Result<KafkaController> getControllerFromAdminClient(ClusterPhy clusterPhy) {
AdminClient adminClient = null;
try {
AdminClient adminClient = null;
try {
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
} catch (Exception e) {
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
// 集群已经加载进来但是创建admin-client失败则设置无controller
return Result.buildSuc();
}
DescribeClusterResult describeClusterResult = adminClient.describeCluster(new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS));
Node controllerNode = describeClusterResult.controller().get();
if (controllerNode == null) {
return Result.buildSuc();
}
return Result.buildSuc(new KafkaController(
clusterPhy.getId(),
controllerNode.id(),
System.currentTimeMillis()
));
adminClient = kafkaAdminClient.getClient(clusterPhy.getId());
} catch (Exception e) {
log.error("class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||errMsg=exception", clusterPhy.getId(), e);
return Result.buildFromRSAndMsg(ResultStatus.KAFKA_OPERATE_FAILED, e.getMessage());
// 集群已经加载进来但是创建admin-client失败则设置无controller
return Result.buildSuc();
}
// 先从DB获取该集群controller
KafkaController dbKafkaController = null;
for (int i = 1; i <= Constant.DEFAULT_RETRY_TIME; ++i) {
try {
if (i == 1) {
// 获取DB中的controller信息
dbKafkaController = this.getKafkaControllerFromDB(clusterPhy.getId());
}
DescribeClusterResult describeClusterResult = adminClient.describeCluster(
new DescribeClusterOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
Node controllerNode = describeClusterResult.controller().get();
if (controllerNode == null) {
return Result.buildSuc();
}
if (dbKafkaController != null && controllerNode.id() == dbKafkaController.getBrokerId()) {
// ID没有变化直接返回原先的
return Result.buildSuc(dbKafkaController);
}
// 发生了变化
return Result.buildSuc(new KafkaController(
clusterPhy.getId(),
controllerNode.id(),
System.currentTimeMillis()
));
} catch (Exception e) {
log.error(
"class=KafkaControllerServiceImpl||method=getControllerFromAdminClient||clusterPhyId={}||tryTime={}||errMsg=exception",
clusterPhy.getId(), i, e
);
}
}
// 三次出错则直接返回无controller
return Result.buildSuc();
}
private Result<KafkaController> getControllerFromZKClient(ClusterPhy clusterPhy) {

View File

@@ -7,6 +7,7 @@ import com.didiglobal.logi.log.LogFactory;
import com.didiglobal.logi.security.common.dto.oplog.OplogDTO;
import com.didiglobal.logi.security.util.PWEncryptUtil;
import com.xiaojukeji.know.streaming.km.common.bean.dto.pagination.PaginationBaseDTO;
import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
import com.xiaojukeji.know.streaming.km.common.bean.entity.kafkauser.KafkaUser;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.kafkauser.KafkaUserParam;
@@ -17,11 +18,13 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.bean.po.KafkaUserPO;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.constant.MsgConstant;
import com.xiaojukeji.know.streaming.km.common.enums.cluster.ClusterAuthTypeEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.ModuleEnum;
import com.xiaojukeji.know.streaming.km.common.enums.operaterecord.OperationEnum;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.common.utils.ValidateUtils;
import com.xiaojukeji.know.streaming.km.core.service.cluster.ClusterPhyService;
import com.xiaojukeji.know.streaming.km.core.service.kafkauser.KafkaUserService;
import com.xiaojukeji.know.streaming.km.core.service.oprecord.OpLogWrapService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
@@ -32,7 +35,6 @@ import kafka.admin.ConfigCommand;
import kafka.server.ConfigType;
import kafka.zk.*;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.security.scram.ScramCredential;
import org.apache.kafka.common.security.scram.internals.ScramCredentialUtils;
import org.apache.kafka.common.security.scram.internals.ScramFormatter;
@@ -71,6 +73,9 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K
@Autowired
private OpLogWrapService opLogWrapService;
@Autowired
private ClusterPhyService clusterPhyService;
@Override
protected VersionItemTypeEnum getVersionItemType() {
return VersionItemTypeEnum.SERVICE_OP_KAFKA_USER;
@@ -571,6 +576,18 @@ public class KafkaUserServiceImpl extends BaseVersionControlService implements K
private Result<List<KafkaUser>> getKafkaUserByKafkaClient(VersionItemParam itemParam) {
KafkaUserParam param = (KafkaUserParam) itemParam;
try {
// 获取集群
ClusterPhy clusterPhy = clusterPhyService.getClusterByCluster(param.getClusterPhyId());
if (clusterPhy == null) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(param.getClusterPhyId()));
}
// 判断认证模式如果是非scram模式直接返回
if (!ClusterAuthTypeEnum.isScram(clusterPhy.getAuthType())) {
log.warn("method=getKafkaUserByKafkaClient||clusterPhyId={}||msg=not scram auth type and ignore get users", clusterPhy.getId());
return Result.buildSuc(new ArrayList<>());
}
AdminClient adminClient = kafkaAdminClient.getClient(param.getClusterPhyId());
// 查询集群kafka-user

View File

@@ -0,0 +1,14 @@
package com.xiaojukeji.know.streaming.km.core.service.partition;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import org.apache.kafka.common.TopicPartition;
import java.util.List;
public interface OpPartitionService {
/**
* 优先副本选举
*/
Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList);
}

View File

@@ -0,0 +1,119 @@
package com.xiaojukeji.know.streaming.km.core.service.partition.impl;
import com.didiglobal.logi.log.ILog;
import com.didiglobal.logi.log.LogFactory;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.VersionItemParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.param.partition.BatchPartitionParam;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.Result;
import com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus;
import com.xiaojukeji.know.streaming.km.common.constant.KafkaConstant;
import com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum;
import com.xiaojukeji.know.streaming.km.common.exception.VCHandlerNotExistException;
import com.xiaojukeji.know.streaming.km.core.service.partition.OpPartitionService;
import com.xiaojukeji.know.streaming.km.core.service.version.BaseVersionControlService;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminClient;
import com.xiaojukeji.know.streaming.km.persistence.kafka.KafkaAdminZKClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ElectLeadersOptions;
import org.apache.kafka.clients.admin.ElectLeadersResult;
import org.apache.kafka.common.ElectionType;
import org.apache.kafka.common.TopicPartition;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import scala.jdk.javaapi.CollectionConverters;
import javax.annotation.PostConstruct;
import java.util.HashSet;
import java.util.List;
import static com.xiaojukeji.know.streaming.km.common.bean.entity.result.ResultStatus.VC_HANDLE_NOT_EXIST;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionEnum.*;
import static com.xiaojukeji.know.streaming.km.common.enums.version.VersionItemTypeEnum.SERVICE_OP_PARTITION_LEADER;
/**
* @author didi
*/
@Service
public class OpPartitionServiceImpl extends BaseVersionControlService implements OpPartitionService {
private static final ILog LOGGER = LogFactory.getLog(OpPartitionServiceImpl.class);
@Autowired
private KafkaAdminClient kafkaAdminClient;
@Autowired
private KafkaAdminZKClient kafkaAdminZKClient;
public static final String PREFERRED_REPLICA_ELECTION = "PreferredReplicaElection";
@Override
protected VersionItemTypeEnum getVersionItemType() {
return SERVICE_OP_PARTITION_LEADER;
}
@PostConstruct
private void init() {
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_0_10_0_0, V_2_8_0, "preferredReplicaElectionByZKClient", this::preferredReplicaElectionByZKClient);
registerVCHandler(PREFERRED_REPLICA_ELECTION, V_2_8_0, V_MAX, "preferredReplicaElectionByKafkaClient", this::preferredReplicaElectionByKafkaClient);
}
@Override
public Result<Void> preferredReplicaElection(Long clusterPhyId, List<TopicPartition> tpList) {
try {
return (Result<Void>) doVCHandler(
clusterPhyId,
PREFERRED_REPLICA_ELECTION,
new BatchPartitionParam(clusterPhyId, tpList)
);
} catch (VCHandlerNotExistException e) {
return Result.buildFailure(VC_HANDLE_NOT_EXIST);
}
}
/**************************************************** private method ****************************************************/
private Result<Void> preferredReplicaElectionByZKClient(VersionItemParam itemParam) {
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
try {
KafkaZkClient kafkaZkClient = kafkaAdminZKClient.getClient(partitionParam.getClusterPhyId());
kafkaZkClient.createPreferredReplicaElection(CollectionConverters.asScala(partitionParam.getTpList()).toSet());
return Result.buildSuc();
} catch (Exception e) {
LOGGER.error(
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByZKClient||clusterPhyId={}||errMsg=exception",
partitionParam.getClusterPhyId(), e
);
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
}
}
private Result<Void> preferredReplicaElectionByKafkaClient(VersionItemParam itemParam) {
BatchPartitionParam partitionParam = (BatchPartitionParam) itemParam;
try {
AdminClient adminClient = kafkaAdminClient.getClient(partitionParam.getClusterPhyId());
ElectLeadersResult electLeadersResult = adminClient.electLeaders(
ElectionType.PREFERRED,
new HashSet<>(partitionParam.getTpList()),
new ElectLeadersOptions().timeoutMs(KafkaConstant.ADMIN_CLIENT_REQUEST_TIME_OUT_UNIT_MS)
);
electLeadersResult.all().get();
return Result.buildSuc();
} catch (Exception e) {
LOGGER.error(
"class=OpPartitionServiceImpl||method=preferredReplicaElectionByKafkaClient||clusterPhyId={}||errMsg=exception",
partitionParam.getClusterPhyId(), e
);
return Result.buildFromRSAndMsg(ResultStatus.ZK_OPERATE_FAILED, e.getMessage());
}
}
}

View File

@@ -75,7 +75,9 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
@Override
public Result<List<PartitionMetrics>> collectPartitionsMetricsFromKafkaWithCache(Long clusterPhyId, String topicName, String metricName) {
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(clusterPhyId, topicName, metricName);
String partitionMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
List<PartitionMetrics> metricsList = CollectedMetricsLocalCache.getPartitionMetricsList(partitionMetricsKey);
if(null != metricsList) {
return Result.buildSuc(metricsList);
}
@@ -88,12 +90,7 @@ public class PartitionMetricServiceImpl extends BaseMetricService implements Par
// 更新cache
PartitionMetrics metrics = metricsResult.getData().get(0);
metrics.getMetrics().entrySet().forEach(
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(
clusterPhyId,
metrics.getTopic(),
metricEntry.getKey(),
metricsResult.getData()
)
metricEntry -> CollectedMetricsLocalCache.putPartitionMetricsList(partitionMetricsKey, metricsResult.getData())
);
return metricsResult;

View File

@@ -77,9 +77,14 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
}
@Override
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId, String topic,
Integer brokerId, Integer partitionId, String metric){
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(clusterPhyId, brokerId, topic, partitionId, metric);
public Result<ReplicationMetrics> collectReplicaMetricsFromKafkaWithCache(Long clusterPhyId,
String topic,
Integer brokerId,
Integer partitionId,
String metric) {
String replicaMetricsKey = CollectedMetricsLocalCache.genReplicaMetricCacheKey(clusterPhyId, brokerId, topic, partitionId, metric);
Float keyValue = CollectedMetricsLocalCache.getReplicaMetrics(replicaMetricsKey);
if(null != keyValue){
ReplicationMetrics replicationMetrics = new ReplicationMetrics(clusterPhyId, topic, partitionId, brokerId);
replicationMetrics.putMetric(metric, keyValue);
@@ -92,11 +97,7 @@ public class ReplicaMetricServiceImpl extends BaseMetricService implements Repli
// 更新cache
ret.getData().getMetrics().entrySet().stream().forEach(
metricNameAndValueEntry -> CollectedMetricsLocalCache.putReplicaMetrics(
clusterPhyId,
brokerId,
topic,
partitionId,
metricNameAndValueEntry.getKey(),
replicaMetricsKey,
metricNameAndValueEntry.getValue()
)
);

View File

@@ -120,7 +120,9 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
@Override
public Result<List<TopicMetrics>> collectTopicMetricsFromKafkaWithCacheFirst(Long clusterPhyId, String topicName, String metricName) {
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(clusterPhyId, topicName, metricName);
String topicMetricsKey = CollectedMetricsLocalCache.genClusterTopicMetricKey(clusterPhyId, topicName, metricName);
List<TopicMetrics> metricsList = CollectedMetricsLocalCache.getTopicMetrics(topicMetricsKey);
if(null != metricsList) {
return Result.buildSuc(metricsList);
}
@@ -133,12 +135,7 @@ public class TopicMetricServiceImpl extends BaseMetricService implements TopicMe
// 更新cache
TopicMetrics metrics = metricsResult.getData().get(0);
metrics.getMetrics().entrySet().forEach(
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(
clusterPhyId,
metrics.getTopic(),
metricEntry.getKey(),
metricsResult.getData()
)
metricEntry -> CollectedMetricsLocalCache.putTopicMetrics(topicMetricsKey, metricsResult.getData())
);
return metricsResult;

View File

@@ -35,6 +35,7 @@ public class ClusterMetricVersionItems extends BaseMetricVersionMetric {
public static final String CLUSTER_METRIC_HEALTH_SCORE_CLUSTER = "HealthScore_Cluster";
public static final String CLUSTER_METRIC_HEALTH_CHECK_PASSED_CLUSTER = "HealthCheckPassed_Cluster";
public static final String CLUSTER_METRIC_HEALTH_CHECK_TOTAL_CLUSTER = "HealthCheckTotal_Cluster";
public static final String CLUSTER_METRIC_TOTAL_REQ_QUEUE_SIZE = "TotalRequestQueueSize";
public static final String CLUSTER_METRIC_TOTAL_RES_QUEUE_SIZE = "TotalResponseQueueSize";
public static final String CLUSTER_METRIC_EVENT_QUEUE_SIZE = "EventQueueSize";