From 70c237da72791ac39b39052826d70cf64a24c87e Mon Sep 17 00:00:00 2001 From: 17hao Date: Sun, 7 Feb 2021 13:23:22 +0800 Subject: [PATCH 1/2] Tracking changes applied to Kafka cluster --- .../service/service/ClusterService.java | 2 +- .../service/impl/ClusterServiceImpl.java | 20 ++++++++++- .../service/utils/ChangeTrackingUtils.java | 35 +++++++++++++++++++ .../versionone/op/OpClusterController.java | 2 +- 4 files changed, 56 insertions(+), 3 deletions(-) create mode 100644 kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java index 004a3f51..b2c5f7b2 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/ClusterService.java @@ -43,7 +43,7 @@ public interface ClusterService { ClusterNameDTO getClusterName(Long logicClusterId); - ResultStatus deleteById(Long clusterId); + ResultStatus deleteById(Long clusterId, String operator); /** * 获取优先被选举为controller的broker diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index fd28308d..74640e2a 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -1,6 +1,8 @@ package com.xiaojukeji.kafka.manager.service.service.impl; import com.xiaojukeji.kafka.manager.common.bizenum.DBStatusEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum; import com.xiaojukeji.kafka.manager.common.entity.Result; import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; import com.xiaojukeji.kafka.manager.common.entity.ao.ClusterDetailDTO; @@ -19,6 +21,7 @@ import com.xiaojukeji.kafka.manager.service.service.ClusterService; import com.xiaojukeji.kafka.manager.service.service.ConsumerService; import com.xiaojukeji.kafka.manager.service.service.RegionService; import com.xiaojukeji.kafka.manager.service.service.ZookeeperService; +import com.xiaojukeji.kafka.manager.service.utils.ChangeTrackingUtils; import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -65,6 +68,9 @@ public class ClusterServiceImpl implements ClusterService { @Autowired private ZookeeperService zookeeperService; + @Autowired + private ChangeTrackingUtils changeTrackingUtils; + @Override public ResultStatus addNew(ClusterDO clusterDO, String operator) { if (ValidateUtils.isNull(clusterDO) || ValidateUtils.isNull(operator)) { @@ -74,6 +80,11 @@ public class ClusterServiceImpl implements ClusterService { return ResultStatus.ZOOKEEPER_CONNECT_FAILED; } try { + Map content = new HashMap<>(); + content.put("zk address", clusterDO.getZookeeper()); + content.put("bootstrap servers", clusterDO.getBootstrapServers()); + content.put("security properties", clusterDO.getSecurityProperties()); + changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content); if (clusterDao.insert(clusterDO) <= 0) { LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO); return ResultStatus.MYSQL_ERROR; @@ -104,6 +115,10 @@ public class ClusterServiceImpl implements ClusterService { return ResultStatus.CHANGE_ZOOKEEPER_FORBIDDEN; } clusterDO.setStatus(originClusterDO.getStatus()); + Map content = new HashMap<>(); + content.put("cluster id", clusterDO.getId().toString()); + content.put("security properties", clusterDO.getSecurityProperties()); + changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content); return updateById(clusterDO); } @@ -254,12 +269,15 @@ public class ClusterServiceImpl implements ClusterService { } @Override - public ResultStatus deleteById(Long clusterId) { + public ResultStatus deleteById(Long clusterId, String operator) { List regionDOList = regionService.getByClusterId(clusterId); if (!ValidateUtils.isEmptyList(regionDOList)) { return ResultStatus.OPERATION_FORBIDDEN; } try { + Map content = new HashMap<>(); + content.put("cluster id", clusterId.toString()); + changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content); if (clusterDao.deleteById(clusterId) <= 0) { LOGGER.error("delete cluster failed, clusterId:{}.", clusterId); return ResultStatus.MYSQL_ERROR; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java new file mode 100644 index 00000000..3fac4ffc --- /dev/null +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java @@ -0,0 +1,35 @@ +package com.xiaojukeji.kafka.manager.service.utils; + +import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum; +import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; +import com.xiaojukeji.kafka.manager.service.service.OperateRecordService; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Map; + +/** + * Track changes applied to Kafka. + */ +public class ChangeTrackingUtils { + private final OperateRecordService operateRecordService; + + @Autowired + public ChangeTrackingUtils(OperateRecordService operateRecordService) { + this.operateRecordService = operateRecordService; + } + + /** + * Saving operate record to database. + */ + public void saveOperateRecord(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map content) { + OperateRecordDO operateRecordDO = new OperateRecordDO(); + operateRecordDO.setOperator(operator); + operateRecordDO.setModuleId(module.getCode()); + operateRecordDO.setResource(resourceName); + operateRecordDO.setOperateId(operate.getCode()); + operateRecordDO.setContent(JsonUtils.toJSONString(content)); + operateRecordService.insert(operateRecordDO); + } +} diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java index 21547aa9..07b7dbc4 100644 --- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java +++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/op/OpClusterController.java @@ -43,7 +43,7 @@ public class OpClusterController { @RequestMapping(value = "clusters", method = RequestMethod.DELETE) @ResponseBody public Result delete(@RequestParam(value = "clusterId") Long clusterId) { - return Result.buildFrom(clusterService.deleteById(clusterId)); + return Result.buildFrom(clusterService.deleteById(clusterId, SpringTool.getUserName())); } @ApiOperation(value = "修改集群信息") From 832320abc616cd12122142150c073079d80d0211 Mon Sep 17 00:00:00 2001 From: 17hao Date: Sun, 7 Feb 2021 14:20:57 +0800 Subject: [PATCH 2/2] Improve code's cohesion && save jmx properties --- .../service/service/OperateRecordService.java | 5 +++ .../service/impl/ClusterServiceImpl.java | 16 ++++----- .../impl/OperateRecordServiceImpl.java | 15 ++++++++ .../service/utils/ChangeTrackingUtils.java | 35 ------------------- 4 files changed, 27 insertions(+), 44 deletions(-) delete mode 100644 kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java index c5007ac6..5b2909ca 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordService.java @@ -1,9 +1,12 @@ package com.xiaojukeji.kafka.manager.service.service; +import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum; import com.xiaojukeji.kafka.manager.common.entity.dto.rd.OperateRecordDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO; import java.util.List; +import java.util.Map; /** * @author zhongyuankai @@ -12,5 +15,7 @@ import java.util.List; public interface OperateRecordService { int insert(OperateRecordDO operateRecordDO); + int insert(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map content); + List queryByCondt(OperateRecordDTO dto); } diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java index 74640e2a..609c8cf9 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/ClusterServiceImpl.java @@ -17,11 +17,7 @@ import com.xiaojukeji.kafka.manager.dao.ClusterMetricsDao; import com.xiaojukeji.kafka.manager.dao.ControllerDao; import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; import com.xiaojukeji.kafka.manager.service.cache.PhysicalClusterMetadataManager; -import com.xiaojukeji.kafka.manager.service.service.ClusterService; -import com.xiaojukeji.kafka.manager.service.service.ConsumerService; -import com.xiaojukeji.kafka.manager.service.service.RegionService; -import com.xiaojukeji.kafka.manager.service.service.ZookeeperService; -import com.xiaojukeji.kafka.manager.service.utils.ChangeTrackingUtils; +import com.xiaojukeji.kafka.manager.service.service.*; import com.xiaojukeji.kafka.manager.service.utils.ConfigUtils; import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; @@ -69,7 +65,7 @@ public class ClusterServiceImpl implements ClusterService { private ZookeeperService zookeeperService; @Autowired - private ChangeTrackingUtils changeTrackingUtils; + private OperateRecordService operateRecordService; @Override public ResultStatus addNew(ClusterDO clusterDO, String operator) { @@ -84,7 +80,8 @@ public class ClusterServiceImpl implements ClusterService { content.put("zk address", clusterDO.getZookeeper()); content.put("bootstrap servers", clusterDO.getBootstrapServers()); content.put("security properties", clusterDO.getSecurityProperties()); - changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content); + content.put("jmx properties", clusterDO.getJmxProperties()); + operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.ADD, content); if (clusterDao.insert(clusterDO) <= 0) { LOGGER.error("add new cluster failed, clusterDO:{}.", clusterDO); return ResultStatus.MYSQL_ERROR; @@ -118,7 +115,8 @@ public class ClusterServiceImpl implements ClusterService { Map content = new HashMap<>(); content.put("cluster id", clusterDO.getId().toString()); content.put("security properties", clusterDO.getSecurityProperties()); - changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content); + content.put("jmx properties", clusterDO.getJmxProperties()); + operateRecordService.insert(operator, ModuleEnum.CLUSTER, clusterDO.getClusterName(), OperateEnum.EDIT, content); return updateById(clusterDO); } @@ -277,7 +275,7 @@ public class ClusterServiceImpl implements ClusterService { try { Map content = new HashMap<>(); content.put("cluster id", clusterId.toString()); - changeTrackingUtils.saveOperateRecord(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content); + operateRecordService.insert(operator, ModuleEnum.CLUSTER, getClusterName(clusterId).getPhysicalClusterName(), OperateEnum.DELETE, content); if (clusterDao.deleteById(clusterId) <= 0) { LOGGER.error("delete cluster failed, clusterId:{}.", clusterId); return ResultStatus.MYSQL_ERROR; diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java index 47702eaa..290bbae5 100644 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java +++ b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/service/impl/OperateRecordServiceImpl.java @@ -1,7 +1,10 @@ package com.xiaojukeji.kafka.manager.service.service.impl; +import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum; import com.xiaojukeji.kafka.manager.common.entity.dto.rd.OperateRecordDTO; import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO; +import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils; import com.xiaojukeji.kafka.manager.dao.OperateRecordDao; import com.xiaojukeji.kafka.manager.service.service.OperateRecordService; @@ -10,6 +13,7 @@ import org.springframework.stereotype.Service; import java.util.Date; import java.util.List; +import java.util.Map; /** * @author zhongyuankai @@ -25,6 +29,17 @@ public class OperateRecordServiceImpl implements OperateRecordService { return operateRecordDao.insert(operateRecordDO); } + @Override + public int insert(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map content) { + OperateRecordDO operateRecordDO = new OperateRecordDO(); + operateRecordDO.setOperator(operator); + operateRecordDO.setModuleId(module.getCode()); + operateRecordDO.setResource(resourceName); + operateRecordDO.setOperateId(operate.getCode()); + operateRecordDO.setContent(JsonUtils.toJSONString(content)); + return insert(operateRecordDO); + } + @Override public List queryByCondt(OperateRecordDTO dto) { return operateRecordDao.queryByCondt( diff --git a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java b/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java deleted file mode 100644 index 3fac4ffc..00000000 --- a/kafka-manager-core/src/main/java/com/xiaojukeji/kafka/manager/service/utils/ChangeTrackingUtils.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.xiaojukeji.kafka.manager.service.utils; - -import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum; -import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum; -import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO; -import com.xiaojukeji.kafka.manager.common.utils.JsonUtils; -import com.xiaojukeji.kafka.manager.service.service.OperateRecordService; -import org.springframework.beans.factory.annotation.Autowired; - -import java.util.Map; - -/** - * Track changes applied to Kafka. - */ -public class ChangeTrackingUtils { - private final OperateRecordService operateRecordService; - - @Autowired - public ChangeTrackingUtils(OperateRecordService operateRecordService) { - this.operateRecordService = operateRecordService; - } - - /** - * Saving operate record to database. - */ - public void saveOperateRecord(String operator, ModuleEnum module, String resourceName, OperateEnum operate, Map content) { - OperateRecordDO operateRecordDO = new OperateRecordDO(); - operateRecordDO.setOperator(operator); - operateRecordDO.setModuleId(module.getCode()); - operateRecordDO.setResource(resourceName); - operateRecordDO.setOperateId(operate.getCode()); - operateRecordDO.setContent(JsonUtils.toJSONString(content)); - operateRecordService.insert(operateRecordDO); - } -}