v2.8.0_e初始化

1、测试代码,开源用户尽量不要使用;
2、包含Kafka-HA的相关功能;
3、并非基于2.6.0拉的分支,是基于master分支的 commit-id: 462303fca0 拉的2.8.0_e的分支。出现这个情况的原因是v2.6.0的代码并不是最新的,2.x最新的代码是 462303fca0 这个commit对应的代码;
This commit is contained in:
zengqiao
2023-02-13 16:35:43 +08:00
parent 462303fca0
commit e81c0f3040
178 changed files with 9938 additions and 1674 deletions

View File

@@ -83,6 +83,12 @@
<artifactId>spring-boot-starter-logging</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-validation</artifactId>
<version>${spring.boot.version}</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>

View File

@@ -72,6 +72,19 @@ public class NormalAppController {
);
}
@ApiLevel(level = ApiLevelContent.LEVEL_NORMAL_3, rateLimit = 1)
@ApiOperation(value = "App列表", notes = "")
@RequestMapping(value = "apps/{clusterId}", method = RequestMethod.GET)
@ResponseBody
public Result<List<AppVO>> getApps(@PathVariable Long clusterId,
@RequestParam(value = "isPhysicalClusterId", required = false, defaultValue = "false") Boolean isPhysicalClusterId) {
Long physicalClusterId = logicalClusterMetadataManager.getPhysicalClusterId(clusterId, isPhysicalClusterId);
return new Result<>(AppConverter.convert2AppVOList(
appService.getByPrincipalAndClusterId(SpringTool.getUserName(), physicalClusterId))
);
}
@ApiOperation(value = "App基本信息", notes = "")
@RequestMapping(value = "apps/{appId}/basic-info", method = RequestMethod.GET)
@ResponseBody

View File

@@ -24,6 +24,7 @@ import com.xiaojukeji.kafka.manager.service.service.ThrottleService;
import com.xiaojukeji.kafka.manager.service.service.TopicService;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.service.service.ha.HaTopicService;
import com.xiaojukeji.kafka.manager.web.converters.ClusterModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
import io.swagger.annotations.Api;
@@ -50,6 +51,9 @@ public class NormalClusterController {
@Autowired
private TopicService topicService;
@Autowired
private HaTopicService haTopicService;
@Autowired
private LogicalClusterService logicalClusterService;
@@ -144,6 +148,13 @@ public class NormalClusterController {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
//过滤备topic
Map<Long, List<String>> relationMap = haTopicService.getClusterStandbyTopicMap();
Set<String> topics = logicalClusterMetadataManager.getTopicNameSet(logicalClusterId);
if (relationMap !=null && relationMap.get(logicalClusterDO.getClusterId()) != null){
topics.removeAll(new HashSet<>(relationMap.get(logicalClusterDO.getClusterId())));
}
return new Result<>(CommonModelConverter.convert2TopicOverviewVOList(
logicalClusterId,
topicService.getTopicOverviewList(

View File

@@ -1,21 +1,23 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.normal;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.Constant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicModifyDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicRetainDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicExpiredVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicMineVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.TopicVO;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaASRelationManager;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.TopicExpiredService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.web.utils.ResultCache;
import com.xiaojukeji.kafka.manager.web.converters.TopicMineConverter;
import com.xiaojukeji.kafka.manager.web.utils.ResultCache;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
@@ -40,6 +42,9 @@ public class NormalTopicMineController {
@Autowired
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@Autowired
private HaASRelationManager haASRelationManager;
@ApiOperation(value = "我的Topic", notes = "")
@RequestMapping(value = "topics/mine", method = RequestMethod.GET)
@ResponseBody
@@ -75,14 +80,31 @@ public class NormalTopicMineController {
if (ValidateUtils.isNull(physicalClusterId)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
return Result.buildFrom(
topicManagerService.modifyTopic(
physicalClusterId,
dto.getTopicName(),
dto.getDescription(),
SpringTool.getUserName()
)
//修改备topic
HaASRelationDO relationDO = haASRelationManager.getASRelation(dto.getClusterId(), dto.getTopicName());
if (relationDO != null){
if (relationDO.getStandbyClusterPhyId().equals(dto.getClusterId())){
return Result.buildFromRSAndMsg(ResultStatus.OPERATION_FORBIDDEN, "备topic不允许操作");
}
ResultStatus rs = topicManagerService.modifyTopic(
relationDO.getStandbyClusterPhyId(),
relationDO.getStandbyResName(),
dto.getDescription(),
SpringTool.getUserName()
);
if (ResultStatus.SUCCESS.getCode() != rs.getCode()){
return Result.buildFrom(rs);
}
}
ResultStatus resultStatus = topicManagerService.modifyTopic(
physicalClusterId,
dto.getTopicName(),
dto.getDescription(),
SpringTool.getUserName()
);
return Result.buildFrom(resultStatus);
}
@ApiOperation(value = "过期Topic信息", notes = "")

View File

@@ -1,13 +1,14 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.ControllerPreferredCandidateDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.rd.ClusterDTO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaClusterManager;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.web.converters.ClusterModelConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -26,6 +27,9 @@ public class OpClusterController {
@Autowired
private ClusterService clusterService;
@Autowired
private HaClusterManager haClusterManager;
@ApiOperation(value = "接入集群")
@PostMapping(value = "clusters")
@ResponseBody
@@ -33,16 +37,14 @@ public class OpClusterController {
if (ValidateUtils.isNull(dto) || !dto.legal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
return Result.buildFrom(
clusterService.addNew(ClusterModelConverter.convert2ClusterDO(dto), SpringTool.getUserName())
);
return haClusterManager.addNew(ClusterModelConverter.convert2ClusterDO(dto), dto.getActiveClusterId(), SpringTool.getUserName());
}
@ApiOperation(value = "删除集群")
@DeleteMapping(value = "clusters")
@ResponseBody
public Result delete(@RequestParam(value = "clusterId") Long clusterId) {
return Result.buildFrom(clusterService.deleteById(clusterId, SpringTool.getUserName()));
return haClusterManager.deleteById(clusterId, SpringTool.getUserName());
}
@ApiOperation(value = "修改集群信息")

View File

@@ -0,0 +1,87 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.bizenum.JobLogBizTypEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ao.ha.job.HaJobState;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.ASSwitchJobActionDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.ha.ASSwitchJobDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.JobLogDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.ha.job.HaJobDetailVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.job.JobLogVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.job.JobMulLogVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.ha.job.HaJobStateVO;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.service.biz.job.HaASSwitchJobManager;
import com.xiaojukeji.kafka.manager.service.service.JobLogService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.ArrayList;
import java.util.List;
/**
* @author zengqiao
* @date 20/4/23
*/
@Api(tags = "OP-HA-主备切换Job相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
public class OpHaASSwitchJobController {
@Autowired
private JobLogService jobLogService;
@Autowired
private HaASSwitchJobManager haASSwitchJobManager;
@ApiOperation(value = "任务创建[ActiveStandbySwitch]")
@PostMapping(value = "as-switch-jobs")
@ResponseBody
public Result<Long> createJob(@Validated @RequestBody ASSwitchJobDTO dto) {
return haASSwitchJobManager.createJob(dto, SpringTool.getUserName());
}
@ApiOperation(value = "任务状态[ActiveStandbySwitch]", notes = "最近一个任务")
@GetMapping(value = "as-switch-jobs/{jobId}/job-state")
@ResponseBody
public Result<HaJobStateVO> jobState(@PathVariable Long jobId) {
Result<HaJobState> haResult = haASSwitchJobManager.jobState(jobId);
if (haResult.failed()) {
return Result.buildFromIgnoreData(haResult);
}
return Result.buildSuc(new HaJobStateVO(haResult.getData()));
}
@ApiOperation(value = "任务详情[ActiveStandbySwitch]", notes = "")
@GetMapping(value = "as-switch-jobs/{jobId}/job-detail")
@ResponseBody
public Result<List<HaJobDetailVO>> jobDetail(@PathVariable Long jobId) {
return haASSwitchJobManager.jobDetail(jobId);
}
@ApiOperation(value = "任务日志[ActiveStandbySwitch]", notes = "")
@GetMapping(value = "as-switch-jobs/{jobId}/job-logs")
@ResponseBody
public Result<JobMulLogVO> jobLog(@PathVariable Long jobId, @RequestParam(required = false) Long startLogId) {
List<JobLogDO> doList = jobLogService.listLogs(JobLogBizTypEnum.HA_SWITCH_JOB_LOG.getCode(), String.valueOf(jobId), startLogId);
List<JobLogVO> voList = doList.isEmpty()? new ArrayList<>(): ConvertUtil.list2List(
doList,
JobLogVO.class
);
return Result.buildSuc(new JobMulLogVO(voList, startLogId));
}
@ApiOperation(value = "任务操作[ActiveStandbySwitch]", notes = "")
@PutMapping(value = "as-switch-jobs/{jobId}/action")
@ResponseBody
public Result<Void> actionJob(@PathVariable Long jobId, @Validated @RequestBody ASSwitchJobActionDTO dto) {
return haASSwitchJobManager.actionJob(jobId, dto);
}
}

View File

@@ -0,0 +1,130 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaResTypeEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.ha.HaStatusEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.KafkaConstant;
import com.xiaojukeji.kafka.manager.common.constant.MsgConstant;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.ha.HaASRelationService;
import com.xiaojukeji.kafka.manager.service.utils.HaTopicCommands;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
/**
* @author zengqiao
* @date 20/4/23
*/
@Api(tags = "OP-HA-Relations维度相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
public class OpHaRelationsController {
private static final Logger LOGGER = LoggerFactory.getLogger(OpHaRelationsController.class);
@Autowired
private ClusterService clusterService;
@Autowired
private HaASRelationService haASRelationService;
@ApiOperation(value = "同步Kafka的HA关系到DB")
@PostMapping(value = "ha-relations/{clusterPhyId}/dest-db")
@ResponseBody
public Result<Void> syncHaRelationsToDB(@PathVariable Long clusterPhyId) {
// 从ZK获取Topic主备关系信息
ClusterDO clusterDO = clusterService.getById(clusterPhyId);
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
}
Map<String, Properties> haTopicsConfigMap = HaTopicCommands.fetchAllHaTopicConfig(clusterDO);
if (haTopicsConfigMap == null) {
LOGGER.error("method=processTask||clusterPhyId={}||msg=fetch all ha topic config failed", clusterPhyId);
return Result.buildFailure(ResultStatus.ZOOKEEPER_READ_FAILED);
}
// 获取当前集群的HA信息
List<HaASRelationDO> doList = haTopicsConfigMap.entrySet()
.stream()
.map(elem -> getHaASRelation(clusterPhyId, elem.getKey(), elem.getValue()))
.filter(relation -> relation != null)
.collect(Collectors.toList());
// 更新HA关系表
Result<Void> rv = haASRelationService.replaceTopicRelationsToDB(clusterPhyId, doList);
if (rv.failed()) {
LOGGER.error("method=processTask||clusterPhyId={}||result={}||msg=replace topic relation failed", clusterPhyId, rv);
}
return rv;
}
// @ApiOperation(value = "同步DB的HA关系到Kafka")
// @PostMapping(value = "ha-relations/{clusterPhyId}/dest-kafka")
// @ResponseBody
// public Result<Void> syncHaRelationsToKafka(@PathVariable Long clusterPhyId) {
// // 从ZK获取Topic主备关系信息
// ClusterDO clusterDO = clusterService.getById(clusterPhyId);
// if (ValidateUtils.isNull(clusterDO)) {
// return Result.buildFromRSAndMsg(ResultStatus.CLUSTER_NOT_EXIST, MsgConstant.getClusterPhyNotExist(clusterPhyId));
// }
//
// Map<String, Properties> haTopicsConfigMap = HaTopicCommands.fetchAllHaTopicConfig(clusterDO);
// if (haTopicsConfigMap == null) {
// LOGGER.error("method=processTask||clusterPhyId={}||msg=fetch all ha topic config failed", clusterPhyId);
// return Result.buildFailure(ResultStatus.ZOOKEEPER_READ_FAILED);
// }
//
// // 获取当前集群的HA信息
// List<HaASRelationDO> doList = haTopicsConfigMap.entrySet()
// .stream()
// .map(elem -> getHaASRelation(clusterPhyId, elem.getKey(), elem.getValue()))
// .filter(relation -> relation != null)
// .collect(Collectors.toList());
//
// // 更新HA关系表
// Result<Void> rv = haASRelationService.replaceTopicRelationsToDB(clusterPhyId, doList);
// if (rv.failed()) {
// LOGGER.error("method=processTask||clusterPhyId={}||result={}||msg=replace topic relation failed", clusterPhyId, rv);
// }
//
// return rv;
// }
private HaASRelationDO getHaASRelation(Long standbyClusterPhyId, String standbyTopicName, Properties props) {
Long activeClusterPhyId = ConvertUtil.string2Long(props.getProperty(KafkaConstant.DIDI_HA_REMOTE_CLUSTER));
if (activeClusterPhyId == null) {
return null;
}
String activeTopicName = props.getProperty(KafkaConstant.DIDI_HA_REMOTE_TOPIC);
if (activeTopicName == null) {
activeTopicName = standbyTopicName;
}
return new HaASRelationDO(
activeClusterPhyId,
activeTopicName,
standbyClusterPhyId,
standbyTopicName,
HaResTypeEnum.TOPIC.getCode(),
HaStatusEnum.STABLE.getCode()
);
}
}

View File

@@ -0,0 +1,43 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.op;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.TopicOperationResult;
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.HaTopicRelationDTO;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaTopicManager;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* 高可用Topic操作相关接口
* @author zengqiao
* @date 21/5/18
*/
@Api(tags = "OP-HA-Topic操作相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_OP_PREFIX)
public class OpHaTopicController {
@Autowired
private HaTopicManager haTopicManager;
@ApiOperation(value = "高可用Topic绑定")
@PostMapping(value = "ha-topics")
@ResponseBody
public Result<List<TopicOperationResult>> batchCreateHaTopic(@Validated @RequestBody HaTopicRelationDTO dto) {
return haTopicManager.batchCreateHaTopic(dto, SpringTool.getUserName());
}
@ApiOperation(value = "高可用topic解绑")
@DeleteMapping(value = "ha-topics")
@ResponseBody
public Result<List<TopicOperationResult>> batchRemoveHaTopic(@Validated @RequestBody HaTopicRelationDTO dto) {
return haTopicManager.batchRemoveHaTopic(dto, SpringTool.getUserName());
}
}

View File

@@ -5,7 +5,9 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.gateway.TopicQuota;
import com.xiaojukeji.kafka.manager.common.entity.dto.gateway.TopicQuotaDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaASRelationManager;
import com.xiaojukeji.kafka.manager.service.service.gateway.QuotaService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
@@ -24,6 +26,9 @@ public class OpQuotaController {
@Autowired
private QuotaService quotaService;
@Autowired
private HaASRelationManager haASRelationManager;
@ApiOperation(value = "配额调整",notes = "配额调整")
@RequestMapping(value = "topic-quotas",method = RequestMethod.POST)
@ResponseBody
@@ -32,6 +37,22 @@ public class OpQuotaController {
// 非空校验
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
HaASRelationDO relationDO = haASRelationManager.getASRelation(dto.getClusterId(), dto.getTopicName());
if (relationDO != null){
if (relationDO.getStandbyClusterPhyId().equals(dto.getClusterId())){
return Result.buildFrom(ResultStatus.HA_TOPIC_DELETE_FORBIDDEN);
}
//备topic调整
dto.setClusterId(relationDO.getStandbyClusterPhyId());
dto.setTopicName(relationDO.getStandbyResName());
ResultStatus resultStatus = quotaService
.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto));
if (ResultStatus.SUCCESS.getCode() != resultStatus.getCode()){
Result.buildFrom(resultStatus);
}
}
return Result.buildFrom(quotaService.addTopicQuotaByAuthority(TopicQuota.buildFrom(dto)));
}
}

View File

@@ -13,14 +13,17 @@ import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicExpansionDTO
import com.xiaojukeji.kafka.manager.common.entity.dto.op.topic.TopicModificationDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ha.HaASRelationDO;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaASRelationManager;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.AdminService;
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.service.service.TopicManagerService;
import com.xiaojukeji.kafka.manager.service.utils.TopicCommands;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
@@ -45,6 +48,9 @@ public class OpTopicController {
@Autowired
private TopicManagerService topicManagerService;
@Autowired
private HaASRelationManager haASRelationManager;
@ApiOperation(value = "创建Topic")
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.POST)
@@ -109,28 +115,23 @@ public class OpTopicController {
@RequestMapping(value = {"topics", "utils/topics"}, method = RequestMethod.PUT)
@ResponseBody
public Result modifyTopic(@RequestBody TopicModificationDTO dto) {
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
if (rc.getCode() != ResultStatus.SUCCESS.getCode()) {
return rc;
if (!dto.paramLegal()) {
return Result.buildFrom(ResultStatus.PARAM_ILLEGAL);
}
ClusterDO clusterDO = rc.getData();
// 获取属性
Properties properties = dto.getProperties();
if (ValidateUtils.isNull(properties)) {
properties = new Properties();
Result rs = topicManagerService.modifyTopic(dto);
if (rs.failed()){
return rs;
}
properties.put(KafkaConstant.RETENTION_MS_KEY, String.valueOf(dto.getRetentionTime()));
// 操作修改
String operator = SpringTool.getUserName();
ResultStatus rs = TopicCommands.modifyTopicConfig(clusterDO, dto.getTopicName(), properties);
if (!ResultStatus.SUCCESS.equals(rs)) {
return Result.buildFrom(rs);
//修改备topic
HaASRelationDO relationDO = haASRelationManager.getASRelation(dto.getClusterId(), dto.getTopicName());
if (relationDO != null && relationDO.getActiveClusterPhyId().equals(dto.getClusterId())){
dto.setClusterId(relationDO.getStandbyClusterPhyId());
dto.setTopicName(relationDO.getStandbyResName());
rs = topicManagerService.modifyTopic(dto);
}
topicManagerService.modifyTopicByOp(dto.getClusterId(), dto.getTopicName(), dto.getAppId(), dto.getDescription(), operator);
return new Result();
return rs;
}
@ApiOperation(value = "Topic扩分区", notes = "")
@@ -143,22 +144,31 @@ public class OpTopicController {
List<TopicOperationResult> resultList = new ArrayList<>();
for (TopicExpansionDTO dto: dtoList) {
Result<ClusterDO> rc = checkParamAndGetClusterDO(dto);
if (!Constant.SUCCESS.equals(rc.getCode())) {
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), rc));
continue;
}
TopicOperationResult result;
// 参数检查合法, 开始对Topic进行扩分区
ResultStatus statusEnum = adminService.expandPartitions(
rc.getData(),
dto.getTopicName(),
dto.getPartitionNum(),
dto.getRegionId(),
dto.getBrokerIdList(),
SpringTool.getUserName()
);
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(), dto.getTopicName(), statusEnum));
HaASRelationDO relationDO = haASRelationManager.getASRelation(dto.getClusterId(), dto.getTopicName());
if (relationDO != null){
//用户侧不允许操作备topic
if (relationDO.getStandbyClusterPhyId().equals(dto.getClusterId())){
resultList.add(TopicOperationResult.buildFrom(dto.getClusterId(),
dto.getTopicName(),
ResultStatus.OPERATION_FORBIDDEN));
continue;
}
//备topic扩分区
TopicExpansionDTO standbyDto = new TopicExpansionDTO();
BeanUtils.copyProperties(dto, standbyDto);
standbyDto.setClusterId(relationDO.getStandbyClusterPhyId());
standbyDto.setTopicName(relationDO.getStandbyResName());
standbyDto.setBrokerIdList(PhysicalClusterMetadataManager.getBrokerIdList(relationDO.getStandbyClusterPhyId()));
standbyDto.setRegionId(null);
result = topicManagerService.expandTopic(standbyDto);
if (ResultStatus.SUCCESS.getCode() != result.getCode()){
resultList.add(result);
continue;
}
}
resultList.add(topicManagerService.expandTopic(dto));
}
for (TopicOperationResult operationResult: resultList) {
@@ -178,6 +188,12 @@ public class OpTopicController {
if (ValidateUtils.isNull(clusterDO)) {
return Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST);
}
HaASRelationDO relationDO = haASRelationManager.getASRelation(dto.getClusterId(), dto.getTopicName());
if (relationDO != null) {
return Result.buildFrom(ResultStatus.HA_TOPIC_DELETE_FORBIDDEN);
}
return new Result<>(clusterDO);
}
}

View File

@@ -2,7 +2,10 @@ package com.xiaojukeji.kafka.manager.web.api.versionone.rd;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.AppDTO;
import com.xiaojukeji.kafka.manager.common.entity.dto.rd.AppRelateTopicsDTO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.app.AppVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.app.AppRelateTopicsVO;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaAppManager;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
@@ -10,6 +13,7 @@ import com.xiaojukeji.kafka.manager.web.converters.AppConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.*;
import java.util.List;
@@ -25,6 +29,9 @@ public class RdAppController {
@Autowired
private AppService appService;
@Autowired
private HaAppManager haAppManager;
@ApiOperation(value = "App列表", notes = "")
@RequestMapping(value = "apps", method = RequestMethod.GET)
@ResponseBody
@@ -40,4 +47,11 @@ public class RdAppController {
appService.updateByAppId(dto, SpringTool.getUserName(), true)
);
}
@ApiOperation(value = "App关联Topic信息查询", notes = "")
@PostMapping(value = "apps/relate-topics")
@ResponseBody
public Result<List<AppRelateTopicsVO>> appRelateTopics(@Validated @RequestBody AppRelateTopicsDTO dto) {
return haAppManager.appRelateTopics(dto.getClusterPhyId(), dto.getFilterTopicNameList());
}
}

View File

@@ -1,27 +1,28 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.rd;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ao.cluster.ControllerPreferredCandidate;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.ControllerPreferredCandidateVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.RdClusterMetricsVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.ClusterBrokerStatusVO;
import com.xiaojukeji.kafka.manager.common.entity.ao.BrokerOverviewDTO;
import com.xiaojukeji.kafka.manager.common.entity.ao.cluster.ControllerPreferredCandidate;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaControllerVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.BrokerOverviewVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.RealTimeMetricsVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.TopicOverviewVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.BrokerOverviewVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.ClusterDetailVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.common.TopicThrottleVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.cluster.TopicMetadataVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaControllerVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.ClusterBrokerStatusVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.ClusterDetailVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.ControllerPreferredCandidateVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.cluster.RdClusterMetricsVO;
import com.xiaojukeji.kafka.manager.common.utils.DateUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.service.*;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.web.converters.*;
import com.xiaojukeji.kafka.manager.web.converters.ClusterModelConverter;
import com.xiaojukeji.kafka.manager.web.converters.CommonModelConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;

View File

@@ -0,0 +1,55 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.rd;
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.vo.ha.HaClusterTopicVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.ha.HaClusterVO;
import com.xiaojukeji.kafka.manager.common.entity.vo.normal.topic.HaClusterTopicHaStatusVO;
import com.xiaojukeji.kafka.manager.service.biz.ha.HaASRelationManager;
import com.xiaojukeji.kafka.manager.service.service.ha.HaClusterService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.List;
/**
* @author zengqiao
* @date 20/4/23
*/
@Api(tags = "RD-HA-Cluster维度相关接口(REST)")
@RestController
@RequestMapping(ApiPrefix.API_V1_RD_PREFIX)
public class RdHaClusterController {
@Autowired
private HaASRelationManager haASRelationManager;
@Autowired
private HaClusterService haClusterService;
@ApiOperation(value = "集群-主备Topic列表", notes = "如果传入secondClusterId则主备关系必须是firstClusterId与secondClusterId的Topic")
@GetMapping(value = "clusters/{firstClusterId}/ha-topics")
@ResponseBody
public Result<List<HaClusterTopicVO>> getHATopics(@PathVariable Long firstClusterId,
@RequestParam(required = false) Long secondClusterId,
@RequestParam(required = false, defaultValue = "true") Boolean filterSystemTopics) {
return Result.buildSuc(haASRelationManager.getHATopics(firstClusterId, secondClusterId, filterSystemTopics != null && filterSystemTopics));
}
@ApiOperation(value = "集群基本信息列表", notes = "含高可用集群信息")
@GetMapping(value = "clusters/ha/basic-info")
@ResponseBody
public Result<List<HaClusterVO>> getClusterBasicInfo() {
return haClusterService.listAllHA();
}
@ApiOperation(value = "集群Topic高可用状态信息", notes = "")
@GetMapping(value = "clusters/{firstClusterId}/ha-topics/status")
@ResponseBody
public Result<List<HaClusterTopicHaStatusVO>> listHaStatusTopics(@PathVariable Long firstClusterId,
@RequestParam(required = false, defaultValue = "true") Boolean checkMetadata) {
return haASRelationManager.listHaStatusTopics(firstClusterId, checkMetadata);
}
}

View File

@@ -1,8 +1,13 @@
package com.xiaojukeji.kafka.manager.web.config;
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.core.config.GlobalConfig;
import com.baomidou.mybatisplus.extension.plugins.PaginationInterceptor;
import com.baomidou.mybatisplus.extension.spring.MybatisSqlSessionFactoryBean;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
@@ -19,6 +24,7 @@ import javax.sql.DataSource;
* @date 20/3/17
*/
@Configuration
@MapperScan("com.xiaojukeji.kafka.manager.dao.ha")
public class DataSourceConfig {
@Bean(name = "dataSource")
@ConfigurationProperties(prefix = "spring.datasource.kafka-manager")
@@ -30,10 +36,15 @@ public class DataSourceConfig {
@Bean(name = "sqlSessionFactory")
@Primary
public SqlSessionFactory sqlSessionFactory(@Qualifier("dataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
MybatisSqlSessionFactoryBean bean = new MybatisSqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
bean.setConfigLocation(new PathMatchingResourcePatternResolver().getResource("classpath:mybatis-config.xml"));
bean.setGlobalConfig(globalConfig());
//添加分页插件,不加这个,分页不生效
bean.setPlugins(paginationInterceptor());
return bean.getObject();
}
@@ -48,4 +59,21 @@ public class DataSourceConfig {
public SqlSessionTemplate sqlSessionTemplate(@Qualifier("sqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
@Bean
public GlobalConfig globalConfig(){
GlobalConfig globalConfig=new GlobalConfig();
globalConfig.setBanner(false);
GlobalConfig.DbConfig dbConfig=new GlobalConfig.DbConfig();
dbConfig.setIdType(IdType.AUTO);
globalConfig.setDbConfig(dbConfig);
return globalConfig;
}
@Bean
public PaginationInterceptor paginationInterceptor() {
PaginationInterceptor page = new PaginationInterceptor();
page.setDbType(DbType.MYSQL);
return page;
}
}

View File

@@ -30,6 +30,7 @@ import com.xiaojukeji.kafka.manager.common.entity.pojo.ControllerDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.utils.MetricsConvertUtils;
import org.springframework.beans.BeanUtils;
import java.util.*;
@@ -89,7 +90,7 @@ public class ClusterModelConverter {
return null;
}
ClusterDetailVO vo = new ClusterDetailVO();
CopyUtils.copyProperties(vo, dto);
BeanUtils.copyProperties(dto, vo);
if (ValidateUtils.isNull(vo.getRegionNum())) {
vo.setRegionNum(0);
}

View File

@@ -39,6 +39,7 @@ public class TopicModelConverter {
vo.setDescription(dto.getDescription());
vo.setBootstrapServers("");
vo.setRegionNameList(dto.getRegionNameList());
vo.setHaRelation(dto.getHaRelation());
if (!ValidateUtils.isNull(clusterDO)) {
vo.setBootstrapServers(clusterDO.getBootstrapServers());
}

View File

@@ -0,0 +1,47 @@
package com.xiaojukeji.kafka.manager.web.handler;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.utils.ConvertUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.validation.FieldError;
import org.springframework.web.bind.MethodArgumentNotValidException;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.RestControllerAdvice;
import java.util.List;
import java.util.stream.Collectors;
@RestControllerAdvice
public class CustomGlobalExceptionHandler {
private static final Logger LOGGER = LoggerFactory.getLogger(CustomGlobalExceptionHandler.class);
/**
* 处理参数异常并返回
* @param me 异常
* @return
*/
@ExceptionHandler(MethodArgumentNotValidException.class)
public Result<Void> methodArgumentNotValidException(MethodArgumentNotValidException me) {
List<FieldError> fieldErrorList = me.getBindingResult().getFieldErrors();
List<String> errorList = fieldErrorList.stream().map(elem -> elem.getDefaultMessage()).collect(Collectors.toList());
return Result.buildFromRSAndMsg(ResultStatus.PARAM_ILLEGAL, ConvertUtil.list2String(errorList, ","));
}
@ExceptionHandler(NullPointerException.class)
public Result<Void> handleNullPointerException(Exception e) {
LOGGER.error("method=handleNullPointerException||errMsg=exception", e);
return Result.buildFromRSAndMsg(ResultStatus.FAIL, "服务空指针异常");
}
@ExceptionHandler(Exception.class)
public Result<Void> handleException(Exception e) {
LOGGER.error("method=handleException||errMsg=exception", e);
return Result.buildFromRSAndMsg(ResultStatus.FAIL, e.getMessage());
}
}

View File

@@ -13,9 +13,9 @@ spring:
active: dev
datasource:
kafka-manager:
jdbc-url: jdbc:mysql://116.85.13.90:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
jdbc-url: jdbc:mysql://localhost:3306/logi_kafka_manager?characterEncoding=UTF-8&useSSL=false&serverTimezone=GMT%2B8
username: root
password: DiDi2020@
password: 123456
driver-class-name: com.mysql.cj.jdbc.Driver
main:
allow-bean-definition-overriding: true
@@ -127,3 +127,6 @@ notify:
topic-name: didi-kafka-notify
order:
detail-url: http://127.0.0.1
d-kafka:
gateway-zk: 127.0.0.1:2181/sd