mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-03 02:52:08 +08:00
@@ -17,5 +17,5 @@ public interface OperateRecordService {
|
||||
|
||||
int insert(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map<String, String> content);
|
||||
|
||||
List<OperateRecordDO> queryByCondt(OperateRecordDTO dto);
|
||||
List<OperateRecordDO> queryByCondition(OperateRecordDTO dto);
|
||||
}
|
||||
|
||||
@@ -22,6 +22,8 @@ import java.util.Map;
|
||||
public interface TopicManagerService {
|
||||
List<TopicDO> listAll();
|
||||
|
||||
List<TopicDO> getByClusterIdFromCache(Long clusterId);
|
||||
|
||||
List<TopicDO> getByClusterId(Long clusterId);
|
||||
|
||||
TopicDO getByTopicName(Long clusterId, String topicName);
|
||||
@@ -30,6 +32,15 @@ public interface TopicManagerService {
|
||||
|
||||
Map<String, List<Double>> getTopicMaxAvgBytesIn(Long clusterId, Integer latestDay, Double minMaxAvgBytesIn);
|
||||
|
||||
/**
|
||||
* 获取指定时间范围内Topic的峰值均值流量
|
||||
* @param clusterId 集群ID
|
||||
* @param topicName Topic名称
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @param maxAvgDay 最大几天的均值
|
||||
* @return
|
||||
*/
|
||||
Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay);
|
||||
|
||||
TopicStatisticsDO getByTopicAndDay(Long clusterId, String topicName, String gmtDay);
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.gateway.impl;
|
||||
|
||||
import com.alibaba.fastjson.JSONObject;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OperationStatusEnum;
|
||||
@@ -10,6 +9,7 @@ import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.KafkaAclDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.gateway.AuthorityDao;
|
||||
import com.xiaojukeji.kafka.manager.dao.gateway.KafkaAclDao;
|
||||
@@ -119,7 +119,7 @@ public class AuthorityServiceImpl implements AuthorityService {
|
||||
operateRecordDO.setModuleId(ModuleEnum.AUTHORITY.getCode());
|
||||
operateRecordDO.setOperateId(OperateEnum.DELETE.getCode());
|
||||
operateRecordDO.setResource(topicName);
|
||||
operateRecordDO.setContent(JSONObject.toJSONString(content));
|
||||
operateRecordDO.setContent(JsonUtils.toJSONString(content));
|
||||
operateRecordDO.setOperator(operator);
|
||||
operateRecordService.insert(operateRecordDO);
|
||||
} catch (Exception e) {
|
||||
@@ -149,7 +149,7 @@ public class AuthorityServiceImpl implements AuthorityService {
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get authority failed, clusterId:{} topicName:{}.", clusterId, topicName, e);
|
||||
}
|
||||
return null;
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -162,14 +162,12 @@ public class AuthorityServiceImpl implements AuthorityService {
|
||||
}
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return new ArrayList<>();
|
||||
} else {
|
||||
assert doList != null;
|
||||
// 过滤权限列表中access=0的
|
||||
List<AuthorityDO> newList = doList.stream()
|
||||
.filter(authorityDO -> !TopicAuthorityEnum.DENY.getCode().equals(authorityDO.getAccess()))
|
||||
.collect(Collectors.toList());
|
||||
return newList;
|
||||
}
|
||||
|
||||
// 去除掉权限列表中无权限的数据
|
||||
return doList.stream()
|
||||
.filter(authorityDO -> !TopicAuthorityEnum.DENY.getCode().equals(authorityDO.getAccess()))
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -26,21 +26,27 @@ public class TopicConnectionServiceImpl implements TopicConnectionService {
|
||||
@Autowired
|
||||
private TopicConnectionDao topicConnectionDao;
|
||||
|
||||
private int splitInterval = 50;
|
||||
|
||||
@Override
|
||||
public void batchAdd(List<TopicConnectionDO> doList) {
|
||||
if (ValidateUtils.isEmptyList(doList)) {
|
||||
return;
|
||||
}
|
||||
int allSize = doList.size();
|
||||
int successSize = 0;
|
||||
|
||||
int count = 0;
|
||||
for (TopicConnectionDO connectionDO: doList) {
|
||||
try {
|
||||
count += topicConnectionDao.replace(connectionDO);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=TopicConnectionServiceImpl||method=batchAdd||connectionDO={}||errMsg={}", connectionDO, e.getMessage());
|
||||
}
|
||||
int part = doList.size() / splitInterval;
|
||||
for (int i = 0; i < part; ++i) {
|
||||
List<TopicConnectionDO> subList = doList.subList(0, splitInterval);
|
||||
successSize += topicConnectionDao.batchReplace(subList);
|
||||
doList.subList(0, splitInterval).clear();
|
||||
}
|
||||
LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", doList.size(), count);
|
||||
if (!ValidateUtils.isEmptyList(doList)) {
|
||||
successSize += topicConnectionDao.batchReplace(doList);
|
||||
}
|
||||
LOGGER.info("class=TopicConnectionServiceImpl||method=batchAdd||allSize={}||successSize={}", allSize, successSize);
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -66,7 +66,10 @@ public class AdminServiceImpl implements AdminService {
|
||||
String applicant,
|
||||
String operator) {
|
||||
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(clusterDO.getId(), regionId, brokerIdList);
|
||||
if (PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList) > DEFAULT_DEAD_BROKER_LIMIT_NUM) {
|
||||
|
||||
Long notAliveBrokerNum = PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList);
|
||||
if (notAliveBrokerNum >= fullBrokerIdList.size() || notAliveBrokerNum > DEFAULT_DEAD_BROKER_LIMIT_NUM) {
|
||||
// broker全挂了,或者是挂的数量大于了DEFAULT_DEAD_BROKER_LIMIT_NUM时, 则认为broker参数不合法
|
||||
return ResultStatus.BROKER_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
||||
@@ -82,6 +82,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
content.put("security properties", clusterDO.getSecurityProperties());
|
||||
content.put("jmx properties", clusterDO.getJmxProperties());
|
||||
operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content);
|
||||
|
||||
if (clusterDao.insert(clusterDO) <= 0) {
|
||||
LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO);
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
|
||||
@@ -41,8 +41,8 @@ public class OperateRecordServiceImpl implements OperateRecordService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OperateRecordDO> queryByCondt(OperateRecordDTO dto) {
|
||||
return operateRecordDao.queryByCondt(
|
||||
public List<OperateRecordDO> queryByCondition(OperateRecordDTO dto) {
|
||||
return operateRecordDao.queryByCondition(
|
||||
dto.getModuleId(),
|
||||
dto.getOperateId(),
|
||||
dto.getOperator(),
|
||||
|
||||
@@ -95,6 +95,14 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TopicDO> getByClusterIdFromCache(Long clusterId) {
|
||||
if (clusterId == null) {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
return topicDao.getByClusterIdFromCache(clusterId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TopicDO> getByClusterId(Long clusterId) {
|
||||
if (clusterId == null) {
|
||||
@@ -139,12 +147,14 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getTopicMaxAvgBytesIn(Long clusterId,
|
||||
String topicName,
|
||||
Date startTime,
|
||||
Date endTime,
|
||||
Integer maxAvgDay) {
|
||||
return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay);
|
||||
public Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay) {
|
||||
try {
|
||||
return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=TopicManagerServiceImpl||method=getTopicMaxAvgBytesIn||clusterId={}||topicName={}||startTime={}||endTime={}||maxAvgDay={}||errMsg={}",
|
||||
clusterId, topicName, startTime, endTime, maxAvgDay, e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -400,7 +400,7 @@ public class TopicServiceImpl implements TopicService {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
List<TopicDO> topicDOList = topicManagerService.getByClusterId(clusterId);
|
||||
List<TopicDO> topicDOList = topicManagerService.getByClusterIdFromCache(clusterId);
|
||||
if (ValidateUtils.isNull(topicDOList)) {
|
||||
topicDOList = new ArrayList<>();
|
||||
}
|
||||
|
||||
@@ -17,7 +17,7 @@ public class ConfigUtils {
|
||||
private String kafkaManagerEnv;
|
||||
|
||||
@Value(value = "${custom.store-metrics-task.save-days}")
|
||||
private Integer maxMetricsSaveDays;
|
||||
private Long maxMetricsSaveDays;
|
||||
|
||||
public String getIdc() {
|
||||
return idc;
|
||||
@@ -35,11 +35,11 @@ public class ConfigUtils {
|
||||
this.kafkaManagerEnv = kafkaManagerEnv;
|
||||
}
|
||||
|
||||
public Integer getMaxMetricsSaveDays() {
|
||||
public Long getMaxMetricsSaveDays() {
|
||||
return maxMetricsSaveDays;
|
||||
}
|
||||
|
||||
public void setMaxMetricsSaveDays(Integer maxMetricsSaveDays) {
|
||||
public void setMaxMetricsSaveDays(Long maxMetricsSaveDays) {
|
||||
this.maxMetricsSaveDays = maxMetricsSaveDays;
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user