v2.1版本更新

This commit is contained in:
zengqiao
2020-12-19 00:27:16 +08:00
parent 3fea5c9c8c
commit 49280a8617
75 changed files with 1098 additions and 148 deletions

View File

@@ -79,6 +79,7 @@ public class LogicalClusterMetadataManager {
Long logicalClusterId = logicalClusterIdMap.get(topicName);
if (ValidateUtils.isNull(logicalClusterId)) {
LOGGER.debug("class=LogicalClusterMetadataManager||method=getTopicLogicalCluster||topicName={}||msg=logicalClusterId is null!",topicName);
return null;
}
return LOGICAL_CLUSTER_MAP.get(logicalClusterId);
@@ -107,6 +108,7 @@ public class LogicalClusterMetadataManager {
public Long getPhysicalClusterId(Long logicalClusterId) {
if (ValidateUtils.isNull(logicalClusterId)) {
LOGGER.debug("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||msg=logicalClusterId is null!");
return null;
}
if (!LOADED.get()) {
@@ -114,6 +116,7 @@ public class LogicalClusterMetadataManager {
}
LogicalClusterDO logicalClusterDO = LOGICAL_CLUSTER_MAP.get(logicalClusterId);
if (ValidateUtils.isNull(logicalClusterDO)) {
LOGGER.debug("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||logicalClusterId={}||msg=logicalClusterDO is null!",logicalClusterId);
return null;
}
return logicalClusterDO.getClusterId();
@@ -124,6 +127,7 @@ public class LogicalClusterMetadataManager {
return clusterId;
}
if (ValidateUtils.isNull(clusterId)) {
LOGGER.warn("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||isPhysicalClusterId={}||msg=clusterId is null!",isPhysicalClusterId);
return null;
}
if (!LOADED.get()) {
@@ -131,6 +135,7 @@ public class LogicalClusterMetadataManager {
}
LogicalClusterDO logicalClusterDO = LOGICAL_CLUSTER_MAP.get(clusterId);
if (ValidateUtils.isNull(logicalClusterDO)) {
LOGGER.debug("class=LogicalClusterMetadataManager||method=getPhysicalClusterId||clusterId={}||msg=logicalClusterDO is null!",clusterId);
return null;
}
return logicalClusterDO.getClusterId();
@@ -171,8 +176,7 @@ public class LogicalClusterMetadataManager {
for (Long regionId: regionIdList) {
RegionDO regionDO = regionMap.get(regionId);
if (ValidateUtils.isNull(regionDO) || !logicalClusterDO.getClusterId().equals(regionDO.getClusterId())) {
LOGGER.warn("flush logical cluster metadata failed, exist illegal region, logicalCluster:{} region:{}.",
logicalClusterDO, regionId);
LOGGER.warn("flush logical cluster metadata failed, exist illegal region, logicalCluster:{} region:{}.", logicalClusterDO, regionId);
continue;
}
brokerIdSet.addAll(ListUtils.string2IntList(regionDO.getBrokerList()));

View File

@@ -86,19 +86,36 @@ public class PhysicalClusterMetadataManager {
if (ZK_CONFIG_MAP.containsKey(clusterDO.getId())) {
return;
}
ZkConfigImpl zkConfig = new ZkConfigImpl(clusterDO.getZookeeper());
//增加Broker监控
// 初始化broker-map
BROKER_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
JMX_CONNECTOR_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
KAFKA_VERSION_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
// 初始化topic-map
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
// 初始化cluster-map
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
if (!zkConfig.checkPathExists(ZkPathUtil.BROKER_ROOT_NODE)) {
LOGGER.info("ignore add cluster, zk path=/brokers not exist, clusterId:{}.", clusterDO.getId());
try {
zkConfig.close();
} catch (Exception e) {
LOGGER.warn("ignore add cluster, close zk connection failed, cluster:{}.", clusterDO, e);
}
return;
}
//增加Broker监控
BrokerStateListener brokerListener = new BrokerStateListener(clusterDO.getId(), zkConfig, configUtils.getJmxMaxConn());
brokerListener.init();
zkConfig.watchChildren(ZkPathUtil.BROKER_IDS_ROOT, brokerListener);
//增加Topic监控
TOPIC_METADATA_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
TopicStateListener topicListener = new TopicStateListener(clusterDO.getId(), zkConfig);
topicListener.init();
zkConfig.watchChildren(ZkPathUtil.BROKER_TOPICS_ROOT, topicListener);
@@ -109,10 +126,6 @@ public class PhysicalClusterMetadataManager {
controllerListener.init();
zkConfig.watch(ZkPathUtil.CONTROLLER_ROOT_NODE, controllerListener);
//增加Config变更监控
TOPIC_RETENTION_TIME_MAP.put(clusterDO.getId(), new ConcurrentHashMap<>());
CLUSTER_MAP.put(clusterDO.getId(), clusterDO);
ZK_CONFIG_MAP.put(clusterDO.getId(), zkConfig);
} catch (Exception e) {
LOGGER.error("add cluster failed, cluster:{}.", clusterDO, e);
@@ -444,8 +457,16 @@ public class PhysicalClusterMetadataManager {
return kafkaVersion;
}
public String getKafkaVersion(Long clusterId) {
return getKafkaVersion(clusterId, PhysicalClusterMetadataManager.getBrokerIdList(clusterId));
public String getKafkaVersionFromCache(Long clusterId) {
Set<String> kafkaVersionSet = new HashSet<>();
for (Integer brokerId: PhysicalClusterMetadataManager.getBrokerIdList(clusterId)) {
String kafkaVersion = this.getKafkaVersionFromCache(clusterId, brokerId);
if (ValidateUtils.isBlank(kafkaVersion)) {
continue;
}
kafkaVersionSet.add(kafkaVersion);
}
return ListUtils.strList2String(new ArrayList<>(kafkaVersionSet));
}
public String getKafkaVersion(Long clusterId, List<Integer> brokerIdList) {

View File

@@ -31,6 +31,8 @@ public interface ConfigService {
List<ConfigDO> listAll();
Integer getAutoPassedTopicApplyOrderNumPerTask();
CreateTopicElemConfig getCreateTopicConfig(Long clusterId, String systemCode);
ClusterDO getClusterDO(Long clusterId);

View File

@@ -31,6 +31,11 @@ public interface JmxService {
TopicMetrics getTopicMetrics(Long clusterId, Integer brokerId, String topicName, Integer metricsCode, Boolean byAdd);
/**
* 获取topic消息压缩指标
*/
String getTopicCodeCValue(Long clusterId, String topicName);
List<TopicMetrics> getTopicMetrics(Long clusterId, Integer metricsCode, Boolean byAdd);
/**

View File

@@ -64,7 +64,7 @@ public interface TopicService {
/**
* 获取Topic的分区的offset
*/
Map<TopicPartition, Long> getPartitionOffset(ClusterDO cluster, String topicName, OffsetPosEnum offsetPosEnum);
Map<TopicPartition, Long> getPartitionOffset(ClusterDO clusterDO, String topicName, OffsetPosEnum offsetPosEnum);
/**
* 获取Topic概览信息

View File

@@ -51,6 +51,13 @@ public interface AppService {
*/
List<AppDO> getByPrincipal(String principal);
/**
* 通过appId来查,需要check当前登录人是否有权限.
* @param appId appId
* @return AppDO
*/
AppDO getAppByUserAndId(String appId, String curUser);
/**
* 通过appId来查
* @param appId appId

View File

@@ -11,7 +11,7 @@ import java.util.List;
* @date 20/4/13
*/
public interface TopicConnectionService {
int batchAdd(List<TopicConnectionDO> doList);
void batchAdd(List<TopicConnectionDO> doList);
/**
* 查询连接信息

View File

@@ -32,6 +32,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* @author zhongyuankai
@@ -59,10 +60,13 @@ public class AppServiceImpl implements AppService {
@Autowired
private OperateRecordService operateRecordService;
@Override
public ResultStatus addApp(AppDO appDO) {
try {
if (appDao.insert(appDO) < 1) {
LOGGER.warn("class=AppServiceImpl||method=addApp||AppDO={}||msg=add fail,{}",appDO,ResultStatus.MYSQL_ERROR.getMessage());
return ResultStatus.MYSQL_ERROR;
}
KafkaUserDO kafkaUserDO = new KafkaUserDO();
@@ -72,6 +76,7 @@ public class AppServiceImpl implements AppService {
kafkaUserDO.setUserType(0);
kafkaUserDao.insert(kafkaUserDO);
} catch (DuplicateKeyException e) {
LOGGER.error("class=AppServiceImpl||method=addApp||errMsg={}||appDO={}|", e.getMessage(), appDO, e);
return ResultStatus.RESOURCE_ALREADY_EXISTED;
} catch (Exception e) {
LOGGER.error("add app failed, appDO:{}.", appDO, e);
@@ -139,23 +144,42 @@ public class AppServiceImpl implements AppService {
return ResultStatus.SUCCESS;
}
} catch (DuplicateKeyException e) {
LOGGER.error("class=AppServiceImpl||method=updateByAppId||errMsg={}||AppDTO={}||operator={}||adminApi={}", e.getMessage(), dto, operator, adminApi, e);
return ResultStatus.RESOURCE_NAME_DUPLICATED;
} catch (Exception e) {
LOGGER.error("update app failed, dto:{}, operator:{}, adminApi:{}.", dto, operator, adminApi, e);
}
LOGGER.warn("class=AppServiceImpl||method=updateByAppId||dto={}||operator={}||adminApi={}||msg=update app fail,{}!", dto,operator,adminApi,ResultStatus.MYSQL_ERROR.getMessage());
return ResultStatus.MYSQL_ERROR;
}
@Override
public List<AppDO> getByPrincipal(String principals) {
public List<AppDO> getByPrincipal(String principal) {
try {
return appDao.getByPrincipal(principals);
List<AppDO> appDOs = appDao.getByPrincipal(principal);
if (!ValidateUtils.isEmptyList(appDOs)) {
return appDOs.stream()
.filter(appDO -> ListUtils.string2StrList(appDO.getPrincipals()).contains(principal))
.collect(Collectors.toList());
}
} catch (Exception e) {
LOGGER.error("get app list failed, principals:{}.", principals);
LOGGER.error("get app list failed, principals:{}.", principal);
}
return new ArrayList<>();
}
@Override
public AppDO getAppByUserAndId(String appId, String curUser) {
AppDO appDO = this.getByAppId(appId);
if (appDO != null) {
if (ListUtils.string2StrList(appDO.getPrincipals()).contains(curUser)) {
return appDO;
}
}
LOGGER.debug("class=AppServiceImpl||method=getAppByUserAndId||appId={}||curUser={}||msg=appDO is null!", appId, curUser);
return null;
}
@Override
public AppDO getByAppId(String appId) {
try {
@@ -177,6 +201,7 @@ public class AppServiceImpl implements AppService {
// 查询AppID
AppDO appDO = appDao.getByAppId(appId);
if (ValidateUtils.isNull(appDO)) {
LOGGER.debug("class=AppServiceImpl||method=getAppTopicDTOList||appId={}||msg=appDO is null!", appId);
return new ArrayList<>();
}
@@ -220,6 +245,7 @@ public class AppServiceImpl implements AppService {
appTopicDTO.setLogicalClusterId(logicalClusterDO.getId());
appTopicDTO.setLogicalClusterName(logicalClusterDO.getName());
} else {
LOGGER.warn("class=AppServiceImpl||method=getAppTopicDTOList||clusterId={}||topicName={}||msg=logicalClusterDO is null!", authorityDO.getClusterId(), authorityDO.getTopicName());
continue;
}
appTopicDTO.setOperator("");

View File

@@ -120,6 +120,7 @@ public class GatewayConfigServiceImpl implements GatewayConfigService {
try {
doList = gatewayConfigDao.getByConfigType(GatewayConfigKeyEnum.SD_SP_RATE.getConfigType());
if (ValidateUtils.isEmptyList(doList)) {
LOGGER.debug("class=GatewayConfigServiceImpl||method=getSpRateConfig||requestVersion={}||msg=doList is empty!",requestVersion);
return new SpRateConfig(Long.MIN_VALUE, new HashMap<>(0));
}
Long maxVersion = Long.MIN_VALUE;

View File

@@ -27,19 +27,20 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
private TopicConnectionDao topicConnectionDao;
@Override
public int batchAdd(List<TopicConnectionDO> doList) {
public void batchAdd(List<TopicConnectionDO> doList) {
if (ValidateUtils.isEmptyList(doList)) {
return 0;
return;
}
int count = 0;
for (TopicConnectionDO connectionDO: doList) {
try {
count += topicConnectionDao.replace(connectionDO);
} catch (Exception e) {
LOGGER.error("replace topic connections failed, data:{}.", connectionDO);
LOGGER.error("class=TopicConnectionServiceImpl||method=batchAdd||connectionDO={}||errMsg={}", connectionDO, e.getMessage());
}
}
return count;
LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", doList.size(), count);
}
@Override

View File

@@ -150,6 +150,8 @@ public class BrokerServiceImpl implements BrokerService {
for (Integer brokerId: brokerIdSet) {
BrokerMetadata brokerMetadata = PhysicalClusterMetadataManager.getBrokerMetadata(clusterId, brokerId);
if (ValidateUtils.isNull(brokerMetadata)) {
LOGGER.warn("class=BrokerServiceImpl||method=getBrokerOverviewList||brokerId={}|||msg=brokerMetadata is null!",
brokerId);
continue;
}
overviewDTOMap.put(brokerId, BrokerOverviewDTO.newInstance(

View File

@@ -286,7 +286,7 @@ public class ClusterServiceImpl implements ClusterService {
dto.setClusterName(clusterDO.getClusterName());
dto.setZookeeper(clusterDO.getZookeeper());
dto.setBootstrapServers(clusterDO.getBootstrapServers());
dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersion(clusterDO.getId()));
dto.setKafkaVersion(physicalClusterMetadataManager.getKafkaVersionFromCache(clusterDO.getId()));
dto.setIdc(configUtils.getIdc());
dto.setSecurityProperties(clusterDO.getSecurityProperties());
dto.setStatus(clusterDO.getStatus());

View File

@@ -42,6 +42,7 @@ public class ConfigServiceImpl implements ConfigService {
} catch (Exception e) {
LOGGER.error("insert config failed, config:{}.", dto, e);
}
LOGGER.warn("class=ConfigServiceImpl||method=insert||dto={}||msg=insert config fail,{}!", dto,ResultStatus.MYSQL_ERROR.getMessage());
return ResultStatus.MYSQL_ERROR;
}
@@ -54,10 +55,12 @@ public class ConfigServiceImpl implements ConfigService {
if (configDao.deleteByKey(configKey) >= 1) {
return ResultStatus.SUCCESS;
}
LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||configKey={}||msg=delete config fail,{}!", configKey,ResultStatus.CONFIG_NOT_EXIST.getMessage());
return ResultStatus.CONFIG_NOT_EXIST;
} catch (Exception e) {
LOGGER.error("delete config failed, configKey:{}.", configKey, e);
}
LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||configKey={}||msg=delete config fail,{}!", configKey,ResultStatus.MYSQL_ERROR.getMessage());
return ResultStatus.MYSQL_ERROR;
}
@@ -67,10 +70,12 @@ public class ConfigServiceImpl implements ConfigService {
if (configDao.updateByKey(convert2ConfigDO(dto)) >= 1) {
return ResultStatus.SUCCESS;
}
LOGGER.warn("class=ConfigServiceImpl||method=updateByKey||dto={}||msg=update config fail,{}!", dto,ResultStatus.CONFIG_NOT_EXIST.getMessage());
return ResultStatus.CONFIG_NOT_EXIST;
} catch (Exception e) {
LOGGER.error("update config failed, config:{}.", dto, e);
}
LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||dto={}||msg=delete config fail,{}!", dto,ResultStatus.MYSQL_ERROR.getMessage());
return ResultStatus.MYSQL_ERROR;
}
@@ -84,10 +89,15 @@ public class ConfigServiceImpl implements ConfigService {
if (configDao.updateByKey(configDO) >= 1) {
return ResultStatus.SUCCESS;
}
LOGGER.warn("class=ConfigServiceImpl||method=updateByKey||configKey={}||configValue={}||msg=update config fail,{}!"
, configKey,configValue,ResultStatus.CONFIG_NOT_EXIST.getMessage());
return ResultStatus.CONFIG_NOT_EXIST;
} catch (Exception e) {
LOGGER.error("update config failed, configValue:{}.", configValue, e);
}
LOGGER.warn("class=ConfigServiceImpl||method=deleteByKey||configKey={}||configValue={}||msg=delete config fail,{}!"
, configKey,configValue,ResultStatus.MYSQL_ERROR.getMessage());
return ResultStatus.MYSQL_ERROR;
}
@@ -161,6 +171,16 @@ public class ConfigServiceImpl implements ConfigService {
return configDO;
}
@Override
public Integer getAutoPassedTopicApplyOrderNumPerTask() {
String configKey = TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY;
CreateTopicConfig configValue = this.getByKey(configKey, CreateTopicConfig.class);
if (ValidateUtils.isNull(configValue)) {
return TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK;
}
return configValue.getMaxPassedOrderNumPerTask();
}
@Override
public CreateTopicElemConfig getCreateTopicConfig(Long clusterId, String systemCode) {
String configKey = TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY;

View File

@@ -110,6 +110,8 @@ public class ConsumerServiceImpl implements ConsumerService {
ConsumerGroupDTO consumeGroupDTO) {
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterDO.getId(), topicName);
if (topicMetadata == null) {
logger.warn("class=ConsumerServiceImpl||method=getConsumeDetail||clusterId={}||topicName={}||msg=topicMetadata is null!",
clusterDO.getId(), topicName);
return null;
}
@@ -120,6 +122,7 @@ public class ConsumerServiceImpl implements ConsumerService {
consumerGroupDetailDTOList = getConsumerPartitionStateInBroker(clusterDO, topicMetadata, consumeGroupDTO);
}
if (consumerGroupDetailDTOList == null) {
logger.info("class=ConsumerServiceImpl||method=getConsumeDetail||msg=consumerGroupDetailDTOList is null!");
return null;
}
@@ -167,7 +170,7 @@ public class ConsumerServiceImpl implements ConsumerService {
kafkaConsumer.close();
}
}
return new ArrayList<>();
return resultList;
}
private List<Result> resetConsumerOffset(ClusterDO cluster, KafkaConsumer<String, String> kafkaConsumer, ConsumerGroupDTO consumerGroupDTO, Map<TopicPartition, Long> offsetMap) {
@@ -184,7 +187,9 @@ public class ConsumerServiceImpl implements ConsumerService {
}
} catch (Exception e) {
logger.error("reset failed, clusterId:{} consumerGroup:{} topic-partition:{}.", cluster.getId(), consumerGroupDTO, tp, e);
resultList.add(new Result());
resultList.add(new Result(
ResultStatus.OPERATION_FAILED.getCode(),
"reset failed..."));
}
resultList.add(new Result());
}

View File

@@ -1,11 +1,13 @@
package com.xiaojukeji.kafka.manager.service.service.impl;
import com.google.common.base.Joiner;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.KafkaVersion;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
@@ -123,11 +125,19 @@ public class JmxServiceImpl implements JmxService {
return null;
}
TopicMetrics metrics = null;
List<BrokerMetrics> brokerMetricsList = new ArrayList<>();
for (Integer brokerId : topicMetadata.getBrokerIdSet()) {
TopicMetrics subMetrics = getTopicMetrics(clusterId, brokerId, topicName, metricsCode, byAdd);
if (ValidateUtils.isNull(subMetrics)) {
continue;
}
BrokerMetrics brokerMetrics = new BrokerMetrics(clusterId, brokerId);
brokerMetrics.setMetricsMap(subMetrics.getMetricsMap());
brokerMetricsList.add(brokerMetrics);
if (ValidateUtils.isNull(metrics)) {
metrics = new TopicMetrics(clusterId, topicName);
}
@@ -137,6 +147,10 @@ public class JmxServiceImpl implements JmxService {
metrics.mergeByMax(subMetrics);
}
}
if (!ValidateUtils.isNull(metrics)) {
metrics.setBrokerMetricsList(brokerMetricsList);
}
return metrics;
}
@@ -169,6 +183,77 @@ public class JmxServiceImpl implements JmxService {
return metrics;
}
@Override
public String getTopicCodeCValue(Long clusterId, String topicName) {
TopicMetadata topicMetadata = PhysicalClusterMetadataManager.getTopicMetadata(clusterId, topicName);
if (topicMetadata == null) {
return null;
}
MbeanV2 topicCodeCMBean = null;
List<MbeanV2> mbeanV2List = MbeanNameUtilV2.getMbeanList(KafkaMetricsCollections.TOPIC_BASIC_PAGE_METRICS);
if (!ValidateUtils.isEmptyList(mbeanV2List)) {
topicCodeCMBean = mbeanV2List.stream()
.filter(mbeanV2 -> "TopicCodeC".equals(mbeanV2.getFieldName()))
.findFirst()
.orElse(null);
}
if (topicCodeCMBean == null) {
return null;
}
KafkaVersion kafkaVersion;
Set<String> codeCValues = new HashSet<>();
TopicMetrics metrics = new TopicMetrics(clusterId, topicName);
for (Integer brokerId : topicMetadata.getBrokerIdSet()) {
JmxConnectorWrap jmxConnectorWrap = PhysicalClusterMetadataManager.getJmxConnectorWrap(clusterId, brokerId);
if (ValidateUtils.isNull(jmxConnectorWrap)|| !jmxConnectorWrap.checkJmxConnectionAndInitIfNeed()) {
continue;
}
kafkaVersion = physicalClusterMetadataManager.getKafkaVersion(clusterId, brokerId);
// 如果是高版本,需要获取指标{kafka.server:type=AppIdTopicMetrics,name=RecordCompression,appId=*,topic=xxx}
if (kafkaVersion.getVersionNum() > KafkaVersion.VERSION_0_10_3.longValue()) {
try {
ObjectName objectNameRegX = new ObjectName(topicCodeCMBean.getObjectName(kafkaVersion.getVersionNum())
+ "*,topic=" + topicName);
QueryExp exp = Query.match(Query.attr("Value"), Query.value("*"));
Set<ObjectName> objectNames = jmxConnectorWrap.queryNames(objectNameRegX, exp);
for (ObjectName objectName : objectNames) {
if (objectName.toString().indexOf(",appId=admin,") == -1) {
String value = (String) jmxConnectorWrap.getAttribute(objectName, "Value");
if (!codeCValues.contains(value)) {
codeCValues.add(value);
}
}
}
} catch (Exception e) {
LOGGER.error("get topic codec metrics failed, clusterId:{} brokerId:{} topicName:{} mbean:{}.",
clusterId, brokerId, topicName, topicCodeCMBean, e
);
}
} else {
// 低版本沿用老逻辑...
try {
getAndSupplyAttributes2BaseMetrics(
metrics,
jmxConnectorWrap,
topicCodeCMBean,
new ObjectName(topicCodeCMBean.getObjectName(kafkaVersion.getVersionNum()) + ",topic=" + topicName)
);
} catch (Exception e) {
LOGGER.error("get topic codec metrics failed, clusterId:{} topicName:{} mbean:{}.",
clusterId, topicName, topicCodeCMBean, e
);
}
}
}
codeCValues.addAll(ListUtils.string2StrList(metrics.getSpecifiedMetrics("TopicCodeCValue", String.class)));
return Joiner.on(",").join(codeCValues);
}
private void getAndSupplyAttributes2BaseMetrics(BaseMetrics metrics,
JmxConnectorWrap jmxConnectorWrap,
MbeanV2 mbeanV2,

View File

@@ -88,6 +88,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService {
public LogicalCluster getLogicalCluster(Long logicalClusterId) {
LogicalClusterDO logicalClusterDO = logicClusterMetadataManager.getLogicalCluster(logicalClusterId);
if (ValidateUtils.isNull(logicalClusterDO)) {
LOGGER.warn("class=LogicalClusterServiceImpl||method=getLogicalCluster||logicalClusterId={}||msg=logicalClusterDO is null!", logicalClusterId);
return null;
}
return convert2LogicalCluster(logicalClusterDO);
@@ -223,8 +224,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService {
return ResultStatus.SUCCESS;
}
} catch (DuplicateKeyException e) {
LOGGER.error("create logical cluster failed, name already existed, newLogicalClusterDO:{}.",
logicalClusterDO, e);
LOGGER.error("create logical cluster failed, name already existed, newLogicalClusterDO:{}.", logicalClusterDO, e);
return ResultStatus.RESOURCE_ALREADY_EXISTED;
} catch (Exception e) {
LOGGER.error("create logical cluster failed, mysql error, newLogicalClusterDO:{}.", logicalClusterDO, e);
@@ -264,6 +264,7 @@ public class LogicalClusterServiceImpl implements LogicalClusterService {
}
return ResultStatus.RESOURCE_NOT_EXIST;
} catch (Exception e) {
LOGGER.error("class=LogicalClusterServiceImpl||method=getById||errMsg={}||logicalClusterId={}", e.getMessage(), logicalClusterId, e);
return ResultStatus.MYSQL_ERROR;
}
}

View File

@@ -68,6 +68,8 @@ public class RegionServiceImpl implements RegionService {
LOGGER.error("create region failed, newRegionDO:{}.", regionDO, e);
return ResultStatus.MYSQL_ERROR;
}
LOGGER.warn("class=RegionServiceImpl||method=createRegion||regionDO={}||msg=create region failed", regionDO);
return ResultStatus.MYSQL_ERROR;
}
@@ -107,6 +109,7 @@ public class RegionServiceImpl implements RegionService {
if (regionDao.updateById(newRegionDO) > 0) {
return ResultStatus.SUCCESS;
}
LOGGER.warn("class=RegionServiceImpl||method=updateRegion||newRegionDO={}||msg=update region failed", newRegionDO);
return ResultStatus.MYSQL_ERROR;
}
List<Integer> newBrokerIdList = ListUtils.string2IntList(newRegionDO.getBrokerList());
@@ -125,6 +128,7 @@ public class RegionServiceImpl implements RegionService {
} catch (Exception e) {
LOGGER.error("update region failed, newRegionDO:{}", newRegionDO, e);
}
LOGGER.warn("class=RegionServiceImpl||method=updateRegion||newRegionDO={}||msg=update region failed", newRegionDO);
return ResultStatus.MYSQL_ERROR;
}

View File

@@ -65,9 +65,6 @@ public class TopicManagerServiceImpl implements TopicManagerService {
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Autowired
private LogicalClusterService logicalClusterService;
@Autowired
private JmxService jmxService;
@@ -77,6 +74,9 @@ public class TopicManagerServiceImpl implements TopicManagerService {
@Autowired
private ClusterService clusterService;
@Autowired
private RegionService regionService;
@Override
public List<TopicDO> listAll() {
try {
@@ -288,7 +288,6 @@ public class TopicManagerServiceImpl implements TopicManagerService {
private List<TopicDTO> getTopics(ClusterDO clusterDO,
Map<String, AppDO> appMap,
Map<String, TopicDO> topicMap) {
Boolean needAuth = !ValidateUtils.isBlank(clusterDO.getSecurityProperties());
List<TopicDTO> dtoList = new ArrayList<>();
for (String topicName: PhysicalClusterMetadataManager.getTopicNameList(clusterDO.getId())) {
LogicalClusterDO logicalClusterDO = logicalClusterMetadataManager.getTopicLogicalCluster(
@@ -305,7 +304,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
dto.setLogicalClusterId(logicalClusterDO.getId());
dto.setLogicalClusterName(logicalClusterDO.getName());
dto.setTopicName(topicName);
dto.setNeedAuth(needAuth);
dto.setNeedAuth(Boolean.TRUE);
TopicDO topicDO = topicMap.get(topicName);
if (ValidateUtils.isNull(topicDO)) {
@@ -371,12 +370,14 @@ public class TopicManagerServiceImpl implements TopicManagerService {
TopicMetadata topicMetaData = PhysicalClusterMetadataManager.getTopicMetadata(physicalClusterId, topicName);
if (ValidateUtils.isNull(topicMetaData)) {
// Topic不存在
LOGGER.warn("class=TopicManagerServiceImpl||method=getTopicAuthorizedApps||physicalClusterId={}||topicName={}||msg=topicMetaData is null", physicalClusterId,topicName);
return new ArrayList<>();
}
List<AuthorityDO> authorityDOList = authorityService.getAuthorityByTopic(physicalClusterId, topicName);
if (ValidateUtils.isEmptyList(authorityDOList)) {
// 无任何权限
LOGGER.warn("class=TopicManagerServiceImpl||method=getTopicAuthorizedApps||physicalClusterId={}||topicName={}||msg=authorityDOList is null", physicalClusterId,topicName);
return new ArrayList<>();
}
@@ -489,12 +490,17 @@ public class TopicManagerServiceImpl implements TopicManagerService {
PhysicalClusterMetadataManager.getZKConfig(physicalClusterId),
topicName
);
List<RegionDO> regionDOList = regionService.getRegionListByTopicName(physicalClusterId, topicName);
List<String> regionNameList = regionDOList.stream().map(RegionDO::getName).collect(Collectors.toList());
TopicDO topicDO = getByTopicName(physicalClusterId, topicName);
if (ValidateUtils.isNull(topicDO)) {
return new Result<>(convert2RdTopicBasic(clusterDO, topicName, null, null, properties));
return new Result<>(convert2RdTopicBasic(clusterDO, topicName, null, null, regionNameList, properties));
}
AppDO appDO = appService.getByAppId(topicDO.getAppId());
return new Result<>(convert2RdTopicBasic(clusterDO, topicName, topicDO, appDO, properties));
return new Result<>(convert2RdTopicBasic(clusterDO, topicName, topicDO, appDO, regionNameList, properties));
}
@Override
@@ -527,6 +533,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
String topicName,
TopicDO topicDO,
AppDO appDO,
List<String> regionNameList,
Properties properties) {
RdTopicBasic rdTopicBasic = new RdTopicBasic();
rdTopicBasic.setClusterId(clusterDO.getId());
@@ -539,6 +546,7 @@ public class TopicManagerServiceImpl implements TopicManagerService {
if (!ValidateUtils.isNull(topicDO)) {
rdTopicBasic.setDescription(topicDO.getDescription());
}
rdTopicBasic.setRegionNameList(regionNameList);
rdTopicBasic.setProperties(properties);
rdTopicBasic.setRetentionTime(KafkaZookeeperUtils.getTopicRetentionTime(properties));
return rdTopicBasic;

View File

@@ -13,7 +13,6 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxConstant;
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.BrokerMetadata;
@@ -44,6 +43,7 @@ import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author limeng
@@ -80,6 +80,9 @@ public class TopicServiceImpl implements TopicService {
@Autowired
private ClusterService clusterService;
@Autowired
private RegionService regionService;
@Override
public List<TopicMetricsDO> getTopicMetricsFromDB(Long clusterId, String topicName, Date startTime, Date endTime) {
try {
@@ -228,25 +231,10 @@ public class TopicServiceImpl implements TopicService {
basicDTO.setPrincipals(appDO.getPrincipals());
}
LogicalClusterDO logicalClusterDO = logicalClusterMetadataManager.getTopicLogicalCluster(clusterId, topicName);
if (!ValidateUtils.isNull(logicalClusterDO)) {
basicDTO.setRegion(logicalClusterDO.getName());
}
List<RegionDO> regionDOList = regionService.getRegionListByTopicName(clusterId, topicName);
basicDTO.setRegionNameList(regionDOList.stream().map(RegionDO::getName).collect(Collectors.toList()));
TopicMetrics metrics = jmxService.getTopicMetrics(
clusterId,
topicName,
KafkaMetricsCollections.TOPIC_BASIC_PAGE_METRICS,
true
);
String compressionType = null;
if (!ValidateUtils.isNull(metrics)) {
compressionType = metrics.getSpecifiedMetrics("TopicCodeCValue", String.class);
}
basicDTO.setTopicCodeC(
ListUtils.strList2String(new ArrayList<>(new HashSet<>(ListUtils.string2StrList(compressionType))))
);
basicDTO.setTopicCodeC(jmxService.getTopicCodeCValue(clusterId, topicName));
basicDTO.setScore(100);
return basicDTO;
}
@@ -469,6 +457,7 @@ public class TopicServiceImpl implements TopicService {
return overview;
}
overview.setByteIn(metrics.getBytesInPerSecOneMinuteRate(null));
overview.setByteOut(metrics.getBytesOutPerSecOneMinuteRate(null));
overview.setProduceRequest(metrics.getTotalProduceRequestsPerSecOneMinuteRate(null));
return overview;
}

View File

@@ -19,6 +19,9 @@ public class ConfigUtils {
@Value(value = "${spring.profiles.active}")
private String kafkaManagerEnv;
@Value(value = "${custom.store-metrics-task.save-days}")
private Integer maxMetricsSaveDays;
public String getIdc() {
return idc;
}
@@ -42,4 +45,12 @@ public class ConfigUtils {
public void setKafkaManagerEnv(String kafkaManagerEnv) {
this.kafkaManagerEnv = kafkaManagerEnv;
}
public Integer getMaxMetricsSaveDays() {
return maxMetricsSaveDays;
}
public void setMaxMetricsSaveDays(Integer maxMetricsSaveDays) {
this.maxMetricsSaveDays = maxMetricsSaveDays;
}
}

View File

@@ -1,5 +1,6 @@
package com.xiaojukeji.kafka.manager.service.utils;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
@@ -11,6 +12,8 @@ import kafka.utils.ZkUtils;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.kafka.common.errors.*;
import org.apache.kafka.common.security.JaasUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConversions;
import scala.collection.Seq;
@@ -22,6 +25,9 @@ import java.util.*;
* @date 20/4/22
*/
public class TopicCommands {
private static final Logger LOGGER = LoggerFactory.getLogger(TopicCommands.class);
public static ResultStatus createTopic(ClusterDO clusterDO,
String topicName,
Integer partitionNum,
@@ -56,16 +62,28 @@ public class TopicCommands {
false
);
} catch (NullPointerException e) {
LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}",
e.getMessage(), clusterDO, topicName, partitionNum, replicaNum, JSON.toJSONString(brokerIdList), config, e);
return ResultStatus.TOPIC_OPERATION_PARAM_NULL_POINTER;
} catch (InvalidPartitionsException e) {
LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}",
e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e);
return ResultStatus.TOPIC_OPERATION_PARTITION_NUM_ILLEGAL;
} catch (InvalidReplicationFactorException e) {
LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}",
e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e);
return ResultStatus.BROKER_NUM_NOT_ENOUGH;
} catch (TopicExistsException | ZkNodeExistsException e) {
LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}",
e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e);
return ResultStatus.TOPIC_OPERATION_TOPIC_EXISTED;
} catch (InvalidTopicException e) {
LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}",
e.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, e);
return ResultStatus.TOPIC_OPERATION_TOPIC_NAME_ILLEGAL;
} catch (Throwable t) {
LOGGER.error("class=TopicCommands||method=createTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||replicaNum={}||brokerIdList={}||config={}",
t.getMessage(), clusterDO, topicName,partitionNum,replicaNum,JSON.toJSONString(brokerIdList),config, t);
return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR;
} finally {
if (zkUtils != null) {
@@ -86,10 +104,13 @@ public class TopicCommands {
);
AdminUtils.deleteTopic(zkUtils, topicName);
} catch (UnknownTopicOrPartitionException e) {
LOGGER.error("class=TopicCommands||method=deleteTopic||errMsg={}||clusterDO={}||topicName={}", e.getMessage(), clusterDO, topicName, e);
return ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION;
} catch (ZkNodeExistsException e) {
LOGGER.error("class=TopicCommands||method=deleteTopic||errMsg={}||clusterDO={}||topicName={}", e.getMessage(), clusterDO, topicName, e);
return ResultStatus.TOPIC_OPERATION_TOPIC_IN_DELETING;
} catch (Throwable t) {
LOGGER.error("class=TopicCommands||method=deleteTopic||errMsg={}||clusterDO={}||topicName={}", t.getMessage(), clusterDO, topicName, t);
return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR;
} finally {
if (zkUtils != null) {
@@ -108,13 +129,15 @@ public class TopicCommands {
Constant.DEFAULT_SESSION_TIMEOUT_UNIT_MS,
JaasUtils.isZkSecurityEnabled()
);
AdminUtils.changeTopicConfig(zkUtils, topicName, config);
} catch (AdminOperationException e) {
LOGGER.error("class=TopicCommands||method=modifyTopicConfig||errMsg={}||clusterDO={}||topicName={}||config={}", e.getMessage(), clusterDO, topicName,config, e);
return ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION;
} catch (InvalidConfigurationException e) {
LOGGER.error("class=TopicCommands||method=modifyTopicConfig||errMsg={}||clusterDO={}||topicName={}||config={}", e.getMessage(), clusterDO, topicName,config, e);
return ResultStatus.TOPIC_OPERATION_TOPIC_CONFIG_ILLEGAL;
} catch (Throwable t) {
LOGGER.error("class=TopicCommands||method=modifyTopicConfig||errMsg={}||clusterDO={}||topicName={}||config={}", t.getMessage(), clusterDO, topicName,config, t);
return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR;
} finally {
if (zkUtils != null) {
@@ -174,6 +197,8 @@ public class TopicCommands {
true
);
} catch (Throwable t) {
LOGGER.error("class=TopicCommands||method=expandTopic||errMsg={}||clusterDO={}||topicName={}||partitionNum={}||brokerIdList={}"
, t.getMessage(), clusterDO, topicName, partitionNum, JSON.toJSONString(brokerIdList), t);
return ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR;
} finally {
if (zkUtils != null) {