mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 11:52:08 +08:00
v2.4.0 be code
This commit is contained in:
@@ -46,7 +46,7 @@ public enum OperateEnum {
|
||||
|
||||
public static boolean validate(Integer code) {
|
||||
if (code == null) {
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
for (OperateEnum state : OperateEnum.values()) {
|
||||
if (state.getCode() == code) {
|
||||
|
||||
@@ -81,11 +81,6 @@ public class OperateRecordDTO {
|
||||
}
|
||||
|
||||
public boolean legal() {
|
||||
if (!ModuleEnum.validate(moduleId) ||
|
||||
(!ValidateUtils.isNull(operateId) && OperateEnum.validate(operateId))
|
||||
) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
return !ValidateUtils.isNull(moduleId) && ModuleEnum.validate(moduleId) && OperateEnum.validate(operateId);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
package com.xiaojukeji.kafka.manager.openapi.common.vo;
|
||||
package com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic;
|
||||
|
||||
import io.swagger.annotations.ApiModel;
|
||||
import io.swagger.annotations.ApiModelProperty;
|
||||
@@ -14,7 +14,6 @@ public class TopicStatisticMetricsVO {
|
||||
|
||||
public TopicStatisticMetricsVO(Double peakBytesIn) {
|
||||
this.peakBytesIn = peakBytesIn;
|
||||
|
||||
}
|
||||
|
||||
public Double getPeakBytesIn() {
|
||||
@@ -17,5 +17,5 @@ public interface OperateRecordService {
|
||||
|
||||
int insert(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map<String, String> content);
|
||||
|
||||
List<OperateRecordDO> queryByCondt(OperateRecordDTO dto);
|
||||
List<OperateRecordDO> queryByCondition(OperateRecordDTO dto);
|
||||
}
|
||||
|
||||
@@ -32,6 +32,15 @@ public interface TopicManagerService {
|
||||
|
||||
Map<String, List<Double>> getTopicMaxAvgBytesIn(Long clusterId, Integer latestDay, Double minMaxAvgBytesIn);
|
||||
|
||||
/**
|
||||
* 获取指定时间范围内Topic的峰值均值流量
|
||||
* @param clusterId 集群ID
|
||||
* @param topicName Topic名称
|
||||
* @param startTime 开始时间
|
||||
* @param endTime 结束时间
|
||||
* @param maxAvgDay 最大几天的均值
|
||||
* @return
|
||||
*/
|
||||
Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay);
|
||||
|
||||
TopicStatisticsDO getByTopicAndDay(Long clusterId, String topicName, String gmtDay);
|
||||
|
||||
@@ -66,7 +66,10 @@ public class AdminServiceImpl implements AdminService {
|
||||
String applicant,
|
||||
String operator) {
|
||||
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(clusterDO.getId(), regionId, brokerIdList);
|
||||
if (PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList) > DEFAULT_DEAD_BROKER_LIMIT_NUM) {
|
||||
|
||||
Long notAliveBrokerNum = PhysicalClusterMetadataManager.getNotAliveBrokerNum(clusterDO.getId(), fullBrokerIdList);
|
||||
if (notAliveBrokerNum >= fullBrokerIdList.size() || notAliveBrokerNum > DEFAULT_DEAD_BROKER_LIMIT_NUM) {
|
||||
// broker全挂了,或者是挂的数量大于了DEFAULT_DEAD_BROKER_LIMIT_NUM时, 则认为broker参数不合法
|
||||
return ResultStatus.BROKER_NOT_EXIST;
|
||||
}
|
||||
|
||||
|
||||
@@ -82,6 +82,7 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
content.put("security properties", clusterDO.getSecurityProperties());
|
||||
content.put("jmx properties", clusterDO.getJmxProperties());
|
||||
operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content);
|
||||
|
||||
if (clusterDao.insert(clusterDO) <= 0) {
|
||||
LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO);
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
|
||||
@@ -41,8 +41,8 @@ public class OperateRecordServiceImpl implements OperateRecordService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OperateRecordDO> queryByCondt(OperateRecordDTO dto) {
|
||||
return operateRecordDao.queryByCondt(
|
||||
public List<OperateRecordDO> queryByCondition(OperateRecordDTO dto) {
|
||||
return operateRecordDao.queryByCondition(
|
||||
dto.getModuleId(),
|
||||
dto.getOperateId(),
|
||||
dto.getOperator(),
|
||||
|
||||
@@ -147,12 +147,14 @@ public class TopicManagerServiceImpl implements TopicManagerService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Double getTopicMaxAvgBytesIn(Long clusterId,
|
||||
String topicName,
|
||||
Date startTime,
|
||||
Date endTime,
|
||||
Integer maxAvgDay) {
|
||||
return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay);
|
||||
public Double getTopicMaxAvgBytesIn(Long clusterId, String topicName, Date startTime, Date endTime, Integer maxAvgDay) {
|
||||
try {
|
||||
return topicStatisticsDao.getTopicMaxAvgBytesIn(clusterId, topicName, startTime, endTime, maxAvgDay);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=TopicManagerServiceImpl||method=getTopicMaxAvgBytesIn||clusterId={}||topicName={}||startTime={}||endTime={}||maxAvgDay={}||errMsg={}",
|
||||
clusterId, topicName, startTime, endTime, maxAvgDay, e.getMessage());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
@@ -14,5 +14,5 @@ public interface OperateRecordDao {
|
||||
|
||||
int insert(OperateRecordDO operateRecordDO);
|
||||
|
||||
List<OperateRecordDO> queryByCondt(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime);
|
||||
List<OperateRecordDO> queryByCondition(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime);
|
||||
}
|
||||
|
||||
@@ -30,13 +30,13 @@ public class OperateRecordDaoImpl implements OperateRecordDao {
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OperateRecordDO> queryByCondt(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime) {
|
||||
public List<OperateRecordDO> queryByCondition(Integer moduleId, Integer operateId, String operator, Date startTime, Date endTime) {
|
||||
Map<String, Object> params = new HashMap<>(5);
|
||||
params.put("moduleId", moduleId);
|
||||
params.put("operateId", operateId);
|
||||
params.put("operator", operator);
|
||||
params.put("startTime", startTime);
|
||||
params.put("endTime", endTime);
|
||||
return sqlSession.selectList("OperateRecordDao.queryByCondt", params);
|
||||
return sqlSession.selectList("OperateRecordDao.queryByCondition", params);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,7 +21,7 @@
|
||||
)
|
||||
</insert>
|
||||
|
||||
<select id="queryByCondt" parameterType="java.util.Map" resultMap="OperateRecordMap">
|
||||
<select id="queryByCondition" parameterType="java.util.Map" resultMap="OperateRecordMap">
|
||||
select *
|
||||
from operate_record
|
||||
where
|
||||
|
||||
@@ -11,11 +11,13 @@ import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.TopicBusinessInfoVO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.*;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaBillDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.jmx.JmxAttributeEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicStatisticMetricsVO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
import com.xiaojukeji.kafka.manager.service.service.*;
|
||||
@@ -339,4 +341,23 @@ public class NormalTopicController {
|
||||
);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "Topic流量统计信息", notes = "")
|
||||
@RequestMapping(value = "{clusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public Result<TopicStatisticMetricsVO> getTopicStatisticMetrics(@PathVariable Long clusterId,
|
||||
@PathVariable String topicName,
|
||||
@RequestParam(value = "isPhysicalClusterId", required = false) Boolean isPhysicalClusterId,
|
||||
@RequestParam("latest-day") Integer latestDay) {
|
||||
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
|
||||
if (ValidateUtils.isNull(physicalClusterId)) {
|
||||
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
|
||||
}
|
||||
|
||||
Double maxAvgBytesIn = topicManagerService.getTopicMaxAvgBytesIn(physicalClusterId, topicName, new Date(DateUtils.getDayStarTime(-1 * latestDay)), new Date(), 1);
|
||||
if (ValidateUtils.isNull(maxAvgBytesIn)) {
|
||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
||||
}
|
||||
return new Result<>(new TopicStatisticMetricsVO(maxAvgBytesIn));
|
||||
}
|
||||
|
||||
}
|
||||
@@ -36,7 +36,7 @@ public class RdOperateRecordController {
|
||||
if (ValidateUtils.isNull(dto) || !dto.legal()) {
|
||||
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
|
||||
}
|
||||
List<OperateRecordVO> voList = OperateRecordModelConverter.convert2OperateRecordVOList(operateRecordService.queryByCondt(dto));
|
||||
List<OperateRecordVO> voList = OperateRecordModelConverter.convert2OperateRecordVOList(operateRecordService.queryByCondition(dto));
|
||||
if (voList.size() > MAX_RECORD_COUNT) {
|
||||
voList = voList.subList(0, MAX_RECORD_COUNT);
|
||||
}
|
||||
|
||||
@@ -13,8 +13,6 @@ import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicAuthorize
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicRequestTimeDetailVO;
|
||||
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.TopicMetadata;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicOffsetChangedVO;
|
||||
import com.xiaojukeji.kafka.manager.openapi.common.vo.TopicStatisticMetricsVO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
|
||||
@@ -30,7 +28,6 @@ import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
|
||||
import java.util.Date;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
@@ -69,27 +66,6 @@ public class ThirdPartTopicController {
|
||||
return new Result<>(vo);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "Topic流量统计信息", notes = "")
|
||||
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/statistic-metrics", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
public Result<TopicStatisticMetricsVO> getTopicStatisticMetrics(@PathVariable Long physicalClusterId,
|
||||
@PathVariable String topicName,
|
||||
@RequestParam("latest-day") Integer latestDay) {
|
||||
try {
|
||||
return new Result<>(new TopicStatisticMetricsVO(topicManagerService.getTopicMaxAvgBytesIn(
|
||||
physicalClusterId,
|
||||
topicName,
|
||||
new Date(DateUtils.getDayStarTime(-1 * latestDay)),
|
||||
new Date(),
|
||||
1
|
||||
)));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("get topic statistic metrics failed, clusterId:{} topicName:{} latestDay:{}."
|
||||
, physicalClusterId, topicName, latestDay, e);
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.MYSQL_ERROR);
|
||||
}
|
||||
|
||||
@ApiOperation(value = "Topic是否有流量", notes = "")
|
||||
@RequestMapping(value = "{physicalClusterId}/topics/{topicName}/offset-changed", method = RequestMethod.GET)
|
||||
@ResponseBody
|
||||
|
||||
Reference in New Issue
Block a user