mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 03:42:07 +08:00
Tracking changes applied to Kafka cluster
This commit is contained in:
@@ -43,7 +43,7 @@ public interface ClusterService {
|
||||
|
||||
ClusterNameDTO getClusterName(Long logicClusterId);
|
||||
|
||||
ResultStatus deleteById(Long clusterId);
|
||||
ResultStatus deleteById(Long clusterId, String operator);
|
||||
|
||||
/**
|
||||
* 获取优先被选举为controller的broker
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package com.xiaojukeji.kafka.manager.service.service.impl;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.DBStatusEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ao.ClusterDetailDTO;
|
||||
@@ -19,6 +21,7 @@ import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ConsumerService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.RegionService;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ZookeeperService;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.ChangeTrackingUtils;
|
||||
import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.slf4j.Logger;
|
||||
@@ -65,6 +68,9 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
@Autowired
|
||||
private ZookeeperService zookeeperService;
|
||||
|
||||
@Autowired
|
||||
private ChangeTrackingUtils changeTrackingUtils;
|
||||
|
||||
@Override
|
||||
public ResultStatus addNew(ClusterDO clusterDO, String operator) {
|
||||
if (ValidateUtils.isNull(clusterDO) || ValidateUtils.isNull(operator)) {
|
||||
@@ -74,6 +80,11 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
return ResultStatus.ZOOKEEPER_CONNECT_FAILED;
|
||||
}
|
||||
try {
|
||||
Map<String, String> content = new HashMap<>();
|
||||
content.put("zk address", clusterDO.getZookeeper());
|
||||
content.put("bootstrap servers", clusterDO.getBootstrapServers());
|
||||
content.put("security properties", clusterDO.getSecurityProperties());
|
||||
changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content);
|
||||
if (clusterDao.insert(clusterDO) <= 0) {
|
||||
LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO);
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
@@ -104,6 +115,10 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
return ResultStatus.CHANGE_ZOOKEEPER_FORBIDDEN;
|
||||
}
|
||||
clusterDO.setStatus(originClusterDO.getStatus());
|
||||
Map<String, String> content = new HashMap<>();
|
||||
content.put("cluster id", clusterDO.getId().toString());
|
||||
content.put("security properties", clusterDO.getSecurityProperties());
|
||||
changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content);
|
||||
return updateById(clusterDO);
|
||||
}
|
||||
|
||||
@@ -254,12 +269,15 @@ public class ClusterServiceImpl implements ClusterService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public ResultStatus deleteById(Long clusterId) {
|
||||
public ResultStatus deleteById(Long clusterId, String operator) {
|
||||
List<RegionDO> regionDOList = regionService.getByClusterId(clusterId);
|
||||
if (!ValidateUtils.isEmptyList(regionDOList)) {
|
||||
return ResultStatus.OPERATION_FORBIDDEN;
|
||||
}
|
||||
try {
|
||||
Map<String, String> content = new HashMap<>();
|
||||
content.put("cluster id", clusterId.toString());
|
||||
changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content);
|
||||
if (clusterDao.deleteById(clusterId) <= 0) {
|
||||
LOGGER.error("delete cluster failed, clusterId:{}.", clusterId);
|
||||
return ResultStatus.MYSQL_ERROR;
|
||||
|
||||
@@ -0,0 +1,35 @@
|
||||
package com.xiaojukeji.kafka.manager.service.utils;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.service.service.OperateRecordService;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* Track changes applied to Kafka.
|
||||
*/
|
||||
public class ChangeTrackingUtils {
|
||||
private final OperateRecordService operateRecordService;
|
||||
|
||||
@Autowired
|
||||
public ChangeTrackingUtils(OperateRecordService operateRecordService) {
|
||||
this.operateRecordService = operateRecordService;
|
||||
}
|
||||
|
||||
/**
|
||||
* Saving operate record to database.
|
||||
*/
|
||||
public void saveOperateRecord(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map<String, String> content) {
|
||||
OperateRecordDO operateRecordDO = new OperateRecordDO();
|
||||
operateRecordDO.setOperator(operator);
|
||||
operateRecordDO.setModuleId(module.getCode());
|
||||
operateRecordDO.setResource(resourceName);
|
||||
operateRecordDO.setOperateId(operate.getCode());
|
||||
operateRecordDO.setContent(JsonUtils.toJSONString(content));
|
||||
operateRecordService.insert(operateRecordDO);
|
||||
}
|
||||
}
|
||||
@@ -43,7 +43,7 @@ public class OpClusterController {
|
||||
@RequestMapping(value = "clusters", method = RequestMethod.DELETE)
|
||||
@ResponseBody
|
||||
public Result delete(@RequestParam(value = "clusterId") Long clusterId) {
|
||||
return Result.buildFrom(clusterService.deleteById(clusterId));
|
||||
return Result.buildFrom(clusterService.deleteById(clusterId, SpringTool.getUserName()));
|
||||
}
|
||||
|
||||
@ApiOperation(value = "修改集群信息")
|
||||
|
||||
Reference in New Issue
Block a user