From ecf6e8f664a110e4184b0c390715f4d0c6572183 Mon Sep 17 00:00:00 2001 From: didi <1643482336@qq.com> Date: Mon, 27 Dec 2021 14:55:35 +0800 Subject: [PATCH] =?UTF-8?q?ConfigService,OperateRecordService,RegionServic?= =?UTF-8?q?e,ThrottleService,TopicExpiredService,TopicManagerService?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=E4=B8=8B=E7=9A=84=E5=8D=95=E5=85=83=E6=B5=8B?= =?UTF-8?q?=E8=AF=95=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/service/ConfigServiceTest.java | 453 ++++++++++- .../service/OperateRecordServiceTest.java | 149 ++++ .../service/service/RegionServiceTest.java | 452 +++++++++++ .../service/service/ThrottleServiceTest.java | 144 ++++ .../service/TopicExpiredServiceTest.java | 65 ++ .../service/TopicManagerServiceTest.java | 752 ++++++++++++++++++ 6 files changed, 2013 insertions(+), 2 deletions(-) create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/RegionServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ThrottleServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerServiceTest.java diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConfigServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConfigServiceTest.java index 51276859..644de8f0 100644 --- a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConfigServiceTest.java +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConfigServiceTest.java @@ -1,11 +1,460 @@ package com.xiaojukeji.kafka.manager.service.service; +import com.alibaba.fastjson.JSON; +import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant; +import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicConfig; +import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig; +import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.TopicExpiredConfig; +import com.xiaojukeji.kafka.manager.common.entity.dto.config.ConfigDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ConfigDO; import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; /** - * @author xuguang - * @Date 2021/12/6 + * @author wyc + * @date 2021/12/9 */ public class ConfigServiceTest extends BaseTest { + @Autowired + private ConfigService configService; + + @DataProvider(name = "configDTO") + public Object[][] provideConfigDO() { + ConfigDTO dto = new ConfigDTO(); + dto.setConfigKey("key1"); + dto.setConfigValue("value1"); + dto.setConfigDescription("test"); + + return new Object[][] {{dto}}; + } + + public ConfigDTO getConfigDTO() { + ConfigDTO dto = new ConfigDTO(); + dto.setConfigKey("key1"); + dto.setConfigValue("value1"); + dto.setConfigDescription("test"); + + return dto; + } + + @Test(dataProvider = "configDTO") + public void insertTest(ConfigDTO dto) { + // 插入时,MySQL错误 + insert2MySQLErrorTest(dto); + + // 插入成功测试 + insert2SuccessTest(dto); + + // 插入时,资源已存在测试 + insert2ResourceExistedTest(dto); + } + + private void insert2SuccessTest(ConfigDTO dto) { + dto.setConfigKey("key1"); + ResultStatus result = configService.insert(dto); + Assert.assertEquals(result, ResultStatus.SUCCESS); + } + + private void insert2ResourceExistedTest(ConfigDTO dto2) { + ResultStatus result2 = configService.insert(dto2); + Assert.assertEquals(result2, ResultStatus.RESOURCE_ALREADY_EXISTED); + } + + private void insert2MySQLErrorTest(ConfigDTO dto) { + dto.setConfigKey(null); + ResultStatus result = configService.insert(dto); + Assert.assertEquals(result, ResultStatus.MYSQL_ERROR); + } + + + @Test + public void deleteByKeyTest() { + // deleteByKey, key时null + deleteByKey2NullTest(); + + // deleteByKey, 配置不存在测试 + deleteByKey2ConfigNotExistTest(); + + // deleteByKey, 成功测试 + deleteByKey2SuccessTest(); + } + + private void deleteByKey2NullTest() { + ResultStatus result = configService.deleteByKey(null); + Assert.assertEquals(result, ResultStatus.PARAM_ILLEGAL); + } + + private void deleteByKey2SuccessTest() { + ConfigDTO dto = getConfigDTO(); + ResultStatus insertResult = configService.insert(dto); + Assert.assertEquals(insertResult, ResultStatus.SUCCESS); + + ResultStatus deleteResult = configService.deleteByKey(dto.getConfigKey()); + Assert.assertEquals(deleteResult, ResultStatus.SUCCESS); + } + + private void deleteByKey2ConfigNotExistTest() { + ResultStatus result = configService.deleteByKey("key"); + Assert.assertEquals(result, ResultStatus.CONFIG_NOT_EXIST); + } + + @Test(dataProvider = "configDTO") + public void updateByKeyTest(ConfigDTO dto) { + configService.insert(dto); + + // updateByKey, 成功测试 + updateByKey2SuccessTest(dto); + + // updateByKey, 配置不存在测试 + updateByKey2ConfigNotExistTest(dto); + } + + private void updateByKey2SuccessTest(ConfigDTO dto) { + dto.setConfigValue("newValue"); + ResultStatus updateResult = configService.updateByKey(dto); + Assert.assertEquals(updateResult, ResultStatus.SUCCESS); + } + + @Test(dataProvider = "configDTO", description = "updateByKey, 配置不存在测试") + private void updateByKey2ConfigNotExistTest(ConfigDTO dto) { + dto.setConfigKey("newKey"); + ResultStatus updateResult = configService.updateByKey(dto); + Assert.assertEquals(updateResult, ResultStatus.CONFIG_NOT_EXIST); + } + +// @Test(dataProvider = "configDTO", description = "updateByKey, MySQL_ERROR测试") +// public void updateByKey2MySQLErrorTest(ConfigDTO dto) { +// dto.setConfigKey(null); +// ResultStatus updateResult = configService.updateByKey(dto); +// Assert.assertEquals(updateResult, ResultStatus.CONFIG_NOT_EXIST); +// } + + + @Test(dataProvider = "configDTO") + public void updateByKeyTest2(ConfigDTO dto) { + configService.insert(dto); + + // updateByKey重载方法,成功测试 + updateByKey2SuccessTest1(dto); + + // updateByKey重载方法,资源不存在测试 + updateByKey2ConfigNotExistTest1(dto); + + } + + private void updateByKey2SuccessTest1(ConfigDTO dto) { + String key = dto.getConfigKey(); + String value = "newValue"; + Assert.assertEquals(configService.updateByKey(key, value), ResultStatus.SUCCESS); + } + + private void updateByKey2ConfigNotExistTest1(ConfigDTO dto) { + Assert.assertEquals(configService.updateByKey("key2", "newValue"), ResultStatus.CONFIG_NOT_EXIST); + } + + + + @Test(dataProvider = "configDTO") + public void getByKeyTest(ConfigDTO dto) { + configService.insert(dto); + + // getByKey, 成功测试 + getByKey2SuccessTest(dto); + + // getByKey, 获取失败测试 + getByKey2NullTest(); + } + + private void getByKey2SuccessTest(ConfigDTO dto) { + ConfigDO result = configService.getByKey(dto.getConfigKey()); + Assert.assertNotNull(result); + Assert.assertTrue(result.getConfigKey().equals(dto.getConfigKey()) && + result.getConfigValue().equals(dto.getConfigValue()) && + result.getConfigDescription().equals(dto.getConfigDescription())); + } + + private void getByKey2NullTest() { + Assert.assertNull(configService.getByKey("key2")); + } + + + + @Test(dataProvider = "configDTO") + public void getByKeyTest2(ConfigDTO dto) { + // 需要用到TopicExpiredConfig类 + TopicExpiredConfig config = getTopicExpiredConfig(); + dto.setConfigValue(JSON.toJSONString(config)); + Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS); + + // getByKey, 成功测试 + getByKey2SuccessTest1(dto); + + // getByKey, 返回null测试 + getByKey2NullTest1(dto); + } + + private TopicExpiredConfig getTopicExpiredConfig() { + TopicExpiredConfig config = new TopicExpiredConfig(); + List list = new ArrayList<>(); + list.add(1L); + list.add(2L); + config.setIgnoreClusterIdList(list); + return config; + } + + private void getByKey2SuccessTest1(ConfigDTO dto) { + TopicExpiredConfig result = configService.getByKey(dto.getConfigKey(), TopicExpiredConfig.class); + Assert.assertEquals(result.toString(), getTopicExpiredConfig().toString()); + } + + private void getByKey2NullTest1(ConfigDTO dto) { + Assert.assertNull(configService.getByKey("key", TopicExpiredConfig.class)); + } + + + + + @Test(dataProvider = "configDTO") + public void getArrayByKeyTest(ConfigDTO dto) { + dto.setConfigValue(JSON.toJSONString(getStringArray())); + Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS); + + // getArrayByKey 成功测试 + getArrayByKey2SuccessTest(dto); + + // getArrayByKey 返回null测试 + getArrayByKey2NullTest(); + } + + private List getStringArray() { + List list = new ArrayList<>(); + list.add("value1"); + list.add("value2"); + list.add("value3"); + return list; + } + + private void getArrayByKey2SuccessTest(ConfigDTO dto) { + List result = configService.getArrayByKey(dto.getConfigKey(), String.class); + Assert.assertEquals(result, getStringArray()); + } + + + private void getArrayByKey2NullTest() { + Assert.assertNull(configService.getArrayByKey(null, String.class)); + } + + + + @Test(dataProvider = "configDTO", description = "getLongValue, 成功测试") + public void getLongValue2SuccessTest(ConfigDTO dto) { + dto.setConfigValue("100"); + Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS); + Assert.assertEquals(configService.getLongValue(dto.getConfigKey(), 0L), Long.valueOf(dto.getConfigValue())); + } + + @Test(description = "getLongValue, 不存在key,返回默认值测试") + public void getLongValue2NotExistTest() { + Assert.assertEquals(configService.getLongValue("key", 100L), Long.valueOf(100L)); + } + + @Test(dataProvider = "configDTO", description = "getLongValue, 存在key但是value是null") + public void getLongValue2ValueIsNull(ConfigDTO dto) { + dto.setConfigValue(null); + Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS); + + Assert.assertEquals(configService.getLongValue(dto.getConfigKey(), 100L), Long.valueOf(100L)); + } + + + @Test(dataProvider = "configDTO", description = "listAll, 成功测试") + public void listAll2SuccessTest(ConfigDTO dto) { + Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS); + List result = configService.listAll(); + Assert.assertNotNull(result); + + List list = new ArrayList<>(); + list.add(dto); + + // 判断key字段是否相同 + Assert.assertEquals(result.stream().map(ConfigDO::getConfigKey).collect(Collectors.toList()), + list.stream().map(ConfigDTO::getConfigKey).collect(Collectors.toList())); + + Assert.assertEquals(result.stream().map(ConfigDO::getConfigValue).collect(Collectors.toList()), + list.stream().map(ConfigDTO::getConfigValue).collect(Collectors.toList())); + + } + + + public CreateTopicConfig getCreateTopicConfig() { + return new CreateTopicConfig(); + } + + public ConfigDTO getConfigDTO1() { + ConfigDTO dto = new ConfigDTO(); + dto.setConfigKey(TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY); + dto.setConfigValue(JSON.toJSONString(getCreateTopicConfig())); + dto.setConfigDescription("test"); + return dto; + } + + @Test(description = "getAutoPassedTopicApplyOrderNumPerTask, config表中不存在INNER_CREATE_TOPIC_CONFIG_KEY" + + "对应的记录,返回默认值测试") + public void getAutoPassedTopicApplyOrderNumPerTask2NotExistTest() { + Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK); + } + + @Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中,记录的maxPassedOrderNumPerTask属性为null测试") + public void getAutoPassedTopicApplyOrderNumPerTask2NullTest() { + configService.insert(getConfigDTO1()); + Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK); + } + + + @Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中,记录的maxPassedOrderNumPerTask" + + "比TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK大时测试") + public void getAutoPassedTopicApplyOrderNumPerTask2BiggerMaxTest() { + ConfigDTO configDTO = getConfigDTO1(); + CreateTopicConfig createTopicConfig = getCreateTopicConfig(); + createTopicConfig.setMaxPassedOrderNumPerTask(TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK + 10); + configDTO.setConfigValue(JSON.toJSONString(createTopicConfig)); + + configService.insert(configDTO); + Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK); + } + + @Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中,记录的maxPassedOrderNumPerTask" + + "比TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK小时测试") + public void getAutoPassedTopicApplyOrderNumPerTask2SmallerMaxTest() { + ConfigDTO configDTO = getConfigDTO1(); + CreateTopicConfig createTopicConfig = getCreateTopicConfig(); + int val = TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK - 10; + createTopicConfig.setMaxPassedOrderNumPerTask(val); + configDTO.setConfigValue(JSON.toJSONString(createTopicConfig)); + + configService.insert(configDTO); + Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), Integer.valueOf(val)); + } + + + + + + + public CreateTopicElemConfig getCreateTopicElemConfig(Long clusterId) { + CreateTopicElemConfig config = new CreateTopicElemConfig(); + config.setClusterId(clusterId); + config.setBrokerIdList(new ArrayList<>()); + config.setRegionIdList(new ArrayList<>()); + config.setPartitionNum(TopicCreationConstant.DEFAULT_PARTITION_NUM); + config.setReplicaNum(TopicCreationConstant.DEFAULT_REPLICA); + config.setRetentionTimeUnitHour(TopicCreationConstant.DEFAULT_RETENTION_TIME_UNIT_HOUR); + config.setAutoExecMaxPeakBytesInUnitB(TopicCreationConstant.AUTO_EXEC_MAX_BYTES_IN_UNIT_B); + return config; + } + + @Test(description = "getCreateTopicConfig, config表中不存在key时测试") + public void getCreateTopicConfig2NotExistKeyTest() { + CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(10L); + Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), createTopicElemConfig.toString()); + } + + @Test(description = "getCreateTopicConfig, value中存在和clusterId一致的记录") + public void getCreateTopicConfig2ExistTest() { + CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(10L); + createTopicElemConfig.setReplicaNum(4); + List list = new ArrayList<>(); + list.add(createTopicElemConfig); + + CreateTopicConfig createTopicConfig = getCreateTopicConfig(); + createTopicConfig.setConfigList(list); + + ConfigDTO configDTO = getConfigDTO1(); + configDTO.setConfigValue(JSON.toJSONString(createTopicConfig)); + configService.insert(configDTO); + + Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), createTopicElemConfig.toString()); + } + + @Test(description = "getCreateTopicConfig, value中不存在和clusterId一致的记录") + public void getCreateTopicConfig2NotExitConfigEleTest() { + CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(11L); + createTopicElemConfig.setReplicaNum(4); + List list = new ArrayList<>(); + list.add(createTopicElemConfig); + + CreateTopicConfig createTopicConfig = getCreateTopicConfig(); + createTopicConfig.setConfigList(list); + + ConfigDTO configDTO = getConfigDTO1(); + configDTO.setConfigValue(JSON.toJSONString(createTopicConfig)); + configService.insert(configDTO); + + Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), getCreateTopicElemConfig(10L).toString()); + } + + + + + public ConfigDTO getConfigDTO2() { + ConfigDTO dto = new ConfigDTO(); + dto.setConfigKey(ConfigConstant.KAFKA_CLUSTER_DO_CONFIG_KEY); + dto.setConfigDescription("test"); + return dto; + } + public ConfigDO getConfigDO() { + return new ConfigDO(); + } + + @Test(description = "getClusterDO, config表中不存在ConfigConstant.KAFKA_CLUSTER_DO_CONFIG_KEY这个key") + public void getClusterDO2NotExistKeyTest() { + Assert.assertNull(configService.getClusterDO(10L)); + } + + @Test(description = "getClusterDO, config表中key对应的value没法解析成ConfigDO测试") + public void getClusterDO2ParseFailTest() { + ConfigDTO configDTO2 = getConfigDTO2(); + configDTO2.setConfigValue("value"); + configService.insert(configDTO2); + + Assert.assertNull(configService.getClusterDO(10L)); + } + public List getClusterDOList() { + ClusterDO clusterDO1 = new ClusterDO(); + clusterDO1.setId(10L); + clusterDO1.setClusterName("test1"); + ClusterDO clusterDO2 = new ClusterDO(); + clusterDO2.setId(20L); + clusterDO2.setClusterName("test2"); + List list = new ArrayList<>(); + list.add(clusterDO1); + list.add(clusterDO2); + return list; + } + + @Test(description = "getClusterDO, 成功查到测试") + public void getClusterDO2SuccessTest() { + ConfigDTO configDTO2 = getConfigDTO2(); + List clusterDOList = getClusterDOList(); + configDTO2.setConfigValue(JSON.toJSONString(clusterDOList)); + configService.insert(configDTO2); + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(20L); + clusterDO.setClusterName("test2"); + + Assert.assertEquals(configService.getClusterDO(20L), clusterDO); + } + } diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordServiceTest.java new file mode 100644 index 00000000..6ab0c64a --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/OperateRecordServiceTest.java @@ -0,0 +1,149 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum; +import com.xiaojukeji.kafka.manager.common.entity.dto.rd.OperateRecordDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author wyc + * @date 2021/12/8 + */ +public class OperateRecordServiceTest extends BaseTest { + @Autowired + private OperateRecordService operateRecordService; + + @DataProvider(name = "operateRecordDO") + public Object[][] provideOperateRecordDO() { + OperateRecordDO operateRecordDO = new OperateRecordDO(); + operateRecordDO.setId(3L); + // 0:topic, 1:应用, 2:配额, 3:权限, 4:集群, 5:分区, 6:Gateway配置, -1:未知 + operateRecordDO.setModuleId(ModuleEnum.CLUSTER.getCode()); + // 0:新增, 1:删除, 2:修改 + operateRecordDO.setOperateId(OperateEnum.ADD.getCode()); + // topic名称、app名称 + operateRecordDO.setResource("testOpRecord"); + operateRecordDO.setContent("testContent"); + operateRecordDO.setOperator("admin"); + return new Object[][] {{operateRecordDO}}; + } + + + private OperateRecordDTO getOperateRecordDTO() { + OperateRecordDTO dto = new OperateRecordDTO(); + dto.setModuleId(ModuleEnum.CLUSTER.getCode()); + dto.setOperateId(OperateEnum.ADD.getCode()); + dto.setOperator("admin"); + return dto; + } + + + @Test(dataProvider = "operateRecordDO", description = "插入操作记录成功测试") + public void insert2SuccessTest(OperateRecordDO operateRecordDO) { + int result = operateRecordService.insert(operateRecordDO); + Assert.assertEquals(result, 1); + } + +// @Test(dataProvider = "operateRecordDO", description = "插入操作记录失败测试") +// public void insert2FailureTest(OperateRecordDO operateRecordDO) { +// operateRecordDO.setResource(null); +// int result = operateRecordService.insert(operateRecordDO); +// Assert.assertEquals(result, 0); +// } + + + @Test(description = "插入的重载方法操作成功测试") + public void insert2SuccessTest1() { + Map content = new HashMap<>(); + content.put("key", "value"); + int result = operateRecordService.insert("admin", ModuleEnum.CLUSTER, "testOpRecord", OperateEnum.ADD, content); + Assert.assertEquals(result, 1); + } + +// @Test(description = "插入的重载方法操作失败测试") +// public void insert2FailureTest1() { +// Map content = new HashMap<>(); +// content.put("key", "value"); +// int result = operateRecordService.insert(null, ModuleEnum.CLUSTER, "testOpRecord", OperateEnum.ADD, content); +// Assert.assertEquals(result, 0); +// } + + @Test(dataProvider = "operateRecordDO") + public void queryByConditionTest(OperateRecordDO operateRecordDO) { + operateRecordService.insert(operateRecordDO); + // endTime和startTime都是null + queryByConditionTest3(operateRecordDO); + + // startTime是null + queryByConditionTest1(operateRecordDO); + + // endTime是null + queryByConditionTest2(operateRecordDO); + + // endTime和startTime都不是null + queryByConditionTest4(operateRecordDO); + + } + + + private void queryByConditionTest1(OperateRecordDO operateRecordDO) { + OperateRecordDTO dto = getOperateRecordDTO(); + dto.setEndTime(new Date().getTime()); + List queryResult = operateRecordService.queryByCondition(dto); + Assert.assertFalse(queryResult.isEmpty()); + // 判断查询得到的OperateRecordDO中日期是否符合要求 + Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 -> + operateRecordDO1.getCreateTime().after(new Date(0L)) && + operateRecordDO1.getCreateTime().before(new Date()) && + operateRecordDO1.getModuleId().equals(dto.getModuleId()) && + operateRecordDO1.getOperateId().equals(dto.getOperateId()) && + operateRecordDO1.getOperator().equals(dto.getOperator()))); + } + + private void queryByConditionTest2(OperateRecordDO operateRecordDO) { + OperateRecordDTO dto = getOperateRecordDTO(); + dto.setStartTime(new Date().getTime()); + // 查询的是create_time >= startTime, 因为创建时间在当前时间之前,因此查到的数据是空的 + List queryResult = operateRecordService.queryByCondition(dto); + Assert.assertTrue(queryResult.isEmpty()); + } + + + private void queryByConditionTest3(OperateRecordDO operateRecordDO) { + OperateRecordDTO dto = getOperateRecordDTO(); + List queryResult = operateRecordService.queryByCondition(dto); + Assert.assertFalse(queryResult.isEmpty()); + + Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 -> + operateRecordDO1.getCreateTime().after(new Date(0L)) && + operateRecordDO1.getCreateTime().before(new Date()) && + operateRecordDO1.getModuleId().equals(dto.getModuleId()) && + operateRecordDO1.getOperateId().equals(dto.getOperateId()) && + operateRecordDO1.getOperator().equals(dto.getOperator()))); + } + + private void queryByConditionTest4(OperateRecordDO operateRecordDO) { + OperateRecordDTO dto = getOperateRecordDTO(); + dto.setStartTime(0L); + dto.setEndTime(1649036393371L); + List queryResult = operateRecordService.queryByCondition(dto); + Assert.assertFalse(queryResult.isEmpty()); + + Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 -> + operateRecordDO1.getCreateTime().after(new Date(0L)) && + operateRecordDO1.getCreateTime().before(new Date()) && + operateRecordDO1.getModuleId().equals(dto.getModuleId()) && + operateRecordDO1.getOperateId().equals(dto.getOperateId()) && + operateRecordDO1.getOperator().equals(dto.getOperator()))); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/RegionServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/RegionServiceTest.java new file mode 100644 index 00000000..e63f5f59 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/RegionServiceTest.java @@ -0,0 +1,452 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.common.utils.ListUtils; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.*; +import java.util.stream.Collectors; + +/** + * @author wyc + * @date 2021/12/8 + */ +public class RegionServiceTest extends BaseTest{ + @Autowired + private RegionService regionService; + + @DataProvider(name = "regionDO") + public Object[][] provideRegionDO() { + RegionDO regionDO = new RegionDO(); + regionDO.setStatus(0); + regionDO.setName("region1"); + // 物理集群id + regionDO.setClusterId(1L); + regionDO.setDescription("test"); + + List brokerIdList = new ArrayList<>(); + brokerIdList.add(1); + brokerIdList.add(2); + regionDO.setBrokerList(ListUtils.intList2String(brokerIdList)); + + return new Object[][] {{regionDO}}; + } + + + @Test(description = "creatRegion, 参数为null测试") + public void createRegion2ParamIllegalTest() { + Assert.assertEquals(regionService.createRegion(null), ResultStatus.PARAM_ILLEGAL); + } + + @Test(dataProvider = "regionDO", description = "createRegion, 成功测试") + public void createRegion2SuccessTest(RegionDO regionDO) { + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + } + + @Test(dataProvider = "regionDO", description = "createRegion, clusterId为空测试") + public void createRegion2ExistBrokerIdAlreadyInRegionTest1(RegionDO regionDO) { + regionDO.setClusterId(null); + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_USED); + } + + + @Test(dataProvider = "regionDO", description = "createRegion, 创建时传入的brokerList中有被使用过的") + public void createRegion2ExistBrokerIdAlreadyInRegionTest2(RegionDO regionDO) { + // 首先创建一个Region, 使用1,2broker + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 再创建一个Region, 使用1,3broker + List newBrokerIdList = new ArrayList<>(); + newBrokerIdList.add(1); + newBrokerIdList.add(3); + regionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList)); + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_USED); + } + + @Test(dataProvider = "regionDO", description = "createRegion, 创建时,region使用到的broker挂掉了") + public void createRegion2BrokerNotExistTest(RegionDO regionDO) { + // 传入一个不存在的物理集群,检测时,会认为该集群存活的broker个数为0 + regionDO.setClusterId(5L); + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.BROKER_NOT_EXIST); + } + + @Test(dataProvider = "regionDO", description = "createRegion, 创建时,regionName重复") + public void createRegion2ResourceAlreadyExistTest(RegionDO regionDO) { + // 先插入一个 + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 插入同名Region,注意brokerList需要保持不一样,不然会返回RESOURCE_ALREADY_USED + List brokerIdList = new ArrayList<>(); + brokerIdList.add(3); + regionDO.setBrokerList(ListUtils.intList2String(brokerIdList)); + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_EXISTED); + } + + + @Test(dataProvider = "regionDO") + public void deleteByIdTest(RegionDO regionDO) { + // 参数非法测试 + deleteById2ParamIllegalTest(regionDO); + + // 资源不存在测试 + deleteById2ResourceNotExistTest(regionDO); + + // 删除成功测试 + deleteById2SuccessTest(regionDO); + + } + + private void deleteById2ParamIllegalTest(RegionDO regionDO) { + Assert.assertEquals(regionService.deleteById(null), ResultStatus.PARAM_ILLEGAL); + } + + private void deleteById2ResourceNotExistTest(RegionDO regionDO) { + Assert.assertEquals(regionService.deleteById(10L), ResultStatus.RESOURCE_NOT_EXIST); + } + + private void deleteById2SuccessTest(RegionDO regionDO) { + regionDO.setId(1L); + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 插入时,xml文件中没用到id,id交给数据库自增,因此需要先查出Region的id,再根据id删除 + List regionDOList = regionService.getByClusterId(1L); + RegionDO region = regionDOList.get(0); + Assert.assertEquals(regionService.deleteById(region.getId()), ResultStatus.SUCCESS); + } + + + @Test(dataProvider = "regionDO", description = "updateRegion, 参数非法测试") + public void updateRegion2ParamIllegalTest1(RegionDO regionDO) { + Assert.assertEquals(regionService.updateRegion(null), ResultStatus.PARAM_ILLEGAL); + Assert.assertEquals(regionService.updateRegion(regionDO), ResultStatus.PARAM_ILLEGAL); + } + + @Test(dataProvider = "regionDO", description = "updateRegion, 资源不存在测试") + public void updateRegion2ResourceNotExistTest1(RegionDO regionDO) { + // 不插入Region,直接更新 + regionDO.setId(1L); + Assert.assertEquals(regionService.updateRegion(regionDO), ResultStatus.RESOURCE_NOT_EXIST); + } + + @Test(dataProvider = "regionDO", description = "updateRegion, brokerList未改变,成功测试") + public void updateRegion2SuccessWithBrokerListNotChangeTest1(RegionDO regionDO) { + // 先在数据库中创建一个Region + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 查询出创建的Region,并修改一些参数后,作为新的Region + List regionDOList = regionService.getByClusterId(1L); + RegionDO newRegionDO = regionDOList.get(0); + newRegionDO.setStatus(1); + + Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.SUCCESS); + } + + @Test(dataProvider = "regionDO", description = "updateRegion, 传入的broker已经被使用测试") + public void updateRegion2ResourceAlreadyUsedTest1(RegionDO regionDO) { + // 先在数据库中创建一个Region + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 查询出创建的Region,并修改brokerList后,作为新的Region + List regionDOList = regionService.getByClusterId(1L); + RegionDO newRegionDO = regionDOList.get(0); + + List newBrokerIdList = new ArrayList<>(); + newBrokerIdList.add(1); + newBrokerIdList.add(3); + + // 更新Region的brokerList + newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList)); + // 构造情况 + newRegionDO.setClusterId(null); + Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.RESOURCE_ALREADY_USED); + } + + @Test(dataProvider = "regionDO", description = "updateRegion, 更新的broker不存在") + public void updateRegion2BrokerNotExistTest1(RegionDO regionDO) { + // 先在数据库中创建一个Region + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 查询出创建的Region,并修改brokerList后,作为新的Region + List regionDOList = regionService.getByClusterId(1L); + RegionDO newRegionDO = regionDOList.get(0); + + // 构造情况 + List newBrokerIdList = new ArrayList<>(); + newBrokerIdList.add(4); + newBrokerIdList.add(5); + newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList)); + + Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.BROKER_NOT_EXIST); + } + + + @Test(dataProvider = "regionDO", description = "updateRegion, brokeList发生了改变,成功测试") + public void updateRegion2SuccessWithBrokerListChangeTest1(RegionDO regionDO) { + // 先在数据库中创建一个Region + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 查询出创建的Region,并修改brokerList后,作为新的Region + List regionDOList = regionService.getByClusterId(1L); + RegionDO newRegionDO = regionDOList.get(0); + + // 构造情况 + List newBrokerIdList = new ArrayList<>(); + newBrokerIdList.add(1); + newBrokerIdList.add(3); + newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList)); + + Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.SUCCESS); + } + + @Test(dataProvider = "regionDO", description = "updateRegion重载方法,参数非法测试") + public void updateRegion2ParamIllegalTest2(RegionDO regionDO) { + Assert.assertEquals(regionService.updateRegion(null, "1,3"), ResultStatus.PARAM_ILLEGAL); + Assert.assertEquals(regionService.updateRegion(1L, "1, 3"), ResultStatus.PARAM_ILLEGAL); + } + + @Test(dataProvider = "regionDO", description = "updateRegion重载方法,成功测试") + public void updateRegion2SuccessTest2(RegionDO regionDO) { + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + List regionDOList = regionService.getByClusterId(1L); + RegionDO region = regionDOList.get(0); + Assert.assertEquals(regionService.updateRegion(region.getId(), "1,3"), ResultStatus.SUCCESS); + } + + + @Test(dataProvider = "regionDO") + public void updateCapacityByIdTest(RegionDO regionDO) { + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + RegionDO region = regionService.getByClusterId(1L).get(0); + region.setCapacity(1000L); + + // 成功测试 + updateCapacityById2SuccessTest(region); + + // 失败测试 + // 集群中不存在regionId是100的 + region.setId(100L); + updateCapacityByIdFailureTest(region); + } + + private void updateCapacityById2SuccessTest(RegionDO regionDO) { + Assert.assertEquals(regionService.updateCapacityById(regionDO), 1); + } + + private void updateCapacityByIdFailureTest(RegionDO regionDO) { + Assert.assertEquals(regionService.updateCapacityById(regionDO), 0); + } + + + @Test(dataProvider = "regionDO") + public void getByIdTest(RegionDO regionDO) { + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + + // 获取成功测试 + RegionDO region = regionService.getByClusterId(1L).get(0); + getById2SuccessTest(region); + + // 获取失败测试 + region.setId(-1L); + getById2FailureTest(region); + + } + + private void getById2SuccessTest(RegionDO regionDO) { + Assert.assertEquals(regionService.getById(regionDO.getId()).toString(), regionDO.toString()); + } + + private void getById2FailureTest(RegionDO regionDO) { + Assert.assertNull(regionService.getById(regionDO.getId())); + } + + @Test(dataProvider = "regionDO") + public void getByClusterIdTest(RegionDO regionDO) { + regionService.createRegion(regionDO); + + // 获取成功测试 + getByClusterId2SuccessTest(regionDO); + + // 获取失败测试 + getByClusterId2FailureTest(regionDO); + } + + private void getByClusterId2SuccessTest(RegionDO regionDO) { + Assert.assertNotNull(regionService.getByClusterId(regionDO.getClusterId())); + Assert.assertTrue(regionService.getByClusterId(regionDO.getClusterId()).stream().allMatch(regionDO1 -> + regionDO1.getName().equals(regionDO.getName()) && + regionDO1.getBrokerList().equals(regionDO.getBrokerList()))); + } + + private void getByClusterId2FailureTest(RegionDO regionDO) { + Assert.assertTrue(regionService.getByClusterId(-1L).isEmpty()); + } + + @Test(dataProvider = "regionDO") + public void listAllTest(RegionDO regionDO) { + Assert.assertTrue(regionService.listAll().isEmpty()); + regionService.createRegion(regionDO); + Assert.assertNotNull(regionService.listAll()); + + Assert.assertTrue(regionService.listAll().stream().allMatch(regionDO1 -> + regionDO1.getName().equals(regionDO.getName()) && + regionDO1.getBrokerList().equals(regionDO.getBrokerList()))); + } + + @Test(dataProvider = "regionDO") + public void getRegionNumTest(RegionDO regionDO) { + // 插入一条数据 + regionService.createRegion(regionDO); + + Map regionNum = regionService.getRegionNum(); + for(Map.Entry entry : regionNum.entrySet()) { + Assert.assertEquals(entry.getKey(), Long.valueOf(1)); + Assert.assertEquals(entry.getValue(), Integer.valueOf(1)); + } + } + + @Test(dataProvider = "regionDO") + public void getFullBrokerIdListTest(RegionDO regionDO) { + List brokerIdList = new ArrayList<>(); + brokerIdList.add(3); + + // regionId是null测试 + getFullBrokerIdList2RegionIdIsNullTest(regionDO, brokerIdList); + + // 数据库中不存在对应的regionId数据 + getFullBrokerIdList2RegionNotExistTest(regionDO, brokerIdList); + + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + RegionDO region = regionService.getByClusterId(1L).get(0); + // 传进来的brokerList是空的 + getFullBrokerIdList2BrokerIdListIsEmpty(regionDO, region, new ArrayList<>()); + + // 传进来的brokerList不是空的 + getFullBrokerIdList2Success(regionDO, region, brokerIdList); + + } + + private void getFullBrokerIdList2RegionIdIsNullTest(RegionDO regionDO, List brokerIdList) { + List fullBrokerIdList = regionService.getFullBrokerIdList(1L, null, brokerIdList); + Assert.assertEquals(fullBrokerIdList, brokerIdList); + } + + private void getFullBrokerIdList2RegionNotExistTest(RegionDO regionDO, List brokerIdList) { + Assert.assertEquals(regionService.getFullBrokerIdList(1L, -1L, brokerIdList), brokerIdList); + } + + private void getFullBrokerIdList2BrokerIdListIsEmpty(RegionDO regionDO, RegionDO regionInDataBase, List brokerIdList) { + List fullBrokerIdList = regionService.getFullBrokerIdList(1L, regionInDataBase.getId(), brokerIdList); + Assert.assertEquals(fullBrokerIdList, ListUtils.string2IntList(regionInDataBase.getBrokerList())); + } + + private void getFullBrokerIdList2Success(RegionDO regionDO, RegionDO regionInDataBase, List brokerIdList) { + List fullBrokerIdList = regionService.getFullBrokerIdList(1L, regionInDataBase.getId(), brokerIdList); + List allBrokerIdList = ListUtils.string2IntList(regionInDataBase.getBrokerList()); + allBrokerIdList.addAll(brokerIdList); + Assert.assertEquals(allBrokerIdList, fullBrokerIdList); + } + + @Test(dataProvider = "regionDO") + public void convert2BrokerIdRegionMapTest(RegionDO regionDO) { + Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS); + List regionDOList = regionService.getByClusterId(1L); + + // regionDOList是null测试 + convert2BrokerIdRegionMap2RegionListDOIsNull(); + + // 成功测试 + convert2BrokerIdRegionMap2Success(regionDO); + } + + private void convert2BrokerIdRegionMap2RegionListDOIsNull() { + Assert.assertTrue(regionService.convert2BrokerIdRegionMap(null).isEmpty()); + } + + private void convert2BrokerIdRegionMap2Success(RegionDO regionDO) { + // 预期结果, key是brokerId, value是Region + List regionDOList = regionService.getByClusterId(1L); + RegionDO region = regionDOList.get(0); + Map brokerIdRegionDOMap = ListUtils.string2IntList(regionDO.getBrokerList()).stream().collect(Collectors.toMap(brokerId -> brokerId, regionDO1 -> region)); + + + // 实际结果 + Map result = regionService.convert2BrokerIdRegionMap(regionDOList); + Assert.assertEquals(brokerIdRegionDOMap, result); + } + + + @Test(dataProvider = "regionDO") + public void getIdleRegionBrokerListTest(RegionDO regionDO) { + // 物理集群id和regionIdList是null测试 + getIdleRegionBrokerList2PhysicalClusterIdIsNullTest(); + + // 参数物理集群下的regionDOList为空测试 + getIdleRegionBrokerList2RegionDOListIsEmptyTest(); + + // 成功测试 + getIdleRegionBrokerList2SuccessTest(regionDO); + } + + private void getIdleRegionBrokerList2PhysicalClusterIdIsNullTest() { + Assert.assertNull(regionService.getIdleRegionBrokerList(null, new ArrayList<>())); + } + + private void getIdleRegionBrokerList2RegionDOListIsEmptyTest() { + List regionIdList = new ArrayList<>(); + regionIdList.add(1L); + Assert.assertNull(regionService.getIdleRegionBrokerList(1L, regionIdList)); + } + + private void getIdleRegionBrokerList2SuccessTest(RegionDO regionDO) { + // 先插入 + regionService.createRegion(regionDO); + // 从数据库中查找 + List regionIdList = regionService.getByClusterId(1L).stream().map(RegionDO::getId).collect(Collectors.toList()); + List brokerIdList = regionService.getByClusterId(1L) + .stream().flatMap(regionDO1 -> ListUtils.string2IntList(regionDO1.getBrokerList()).stream()) + .collect(Collectors.toList()); + Assert.assertEquals(regionService.getIdleRegionBrokerList(1L, regionIdList), brokerIdList); + } + + @Test + public void getTopicNameRegionBrokerIdMap2SuccessTest() { + // 创建逻辑集群,创建Topic,均已在数据库写入 + // 逻辑集群基于物理集群1建立,region的brokerList是1,2 + // Topic基于region建立,也就是使用到broker1和2 + + // 这个方法是返回topicName -> topic所使用broker以及这些broker所在region中所有的broker + Map> topicNameRegionBrokerIdMap = regionService.getTopicNameRegionBrokerIdMap(1L); + Map> expectedMap = new HashMap<>(); + Set set = new HashSet<>(); + set.add(1); + set.add(2); + expectedMap.put("topic_a", set); + Assert.assertEquals(topicNameRegionBrokerIdMap, expectedMap); + } + + @Test + public void getRegionListByTopicNameTest() { + // 数据库中依然建立了Region, LogicalCluster, Topic + getRegionListByTopicName2EmptyTest(); + + // 返回集合不为空测试 + getRegionListByTopicName2Success(); + } + + private void getRegionListByTopicName2EmptyTest() { + // 传入一个不存在的topic + Assert.assertEquals(regionService.getRegionListByTopicName(1L, "notExistTopic"), new ArrayList<>()); + } + + private void getRegionListByTopicName2Success() { + List expectedResult = regionService.getByClusterId(1L); + Assert.assertEquals(regionService.getRegionListByTopicName(1L, "topic_a"), expectedResult); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ThrottleServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ThrottleServiceTest.java new file mode 100644 index 00000000..7a3b9c39 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ThrottleServiceTest.java @@ -0,0 +1,144 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; +import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicThrottledMetricsDO; +import com.xiaojukeji.kafka.manager.dao.TopicThrottledMetricsDao; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * @author wyc + * @date 2021/12/24 + */ +public class ThrottleServiceTest extends BaseTest { + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static String REAL_TOPIC_IN_ZK = "topic_a"; + + private final static String ADMIN_NAME_IN_MYSQL = "admin"; + + private final static String KAFKA_MANAGER_APP_NAME = "KM管理员"; + + private final static String KAFKA_MANAGER_APP_ID = "dkm_admin"; + + private final static Set REAL_BROKER_ID_SET = new HashSet<>(); + + private final static String REAL_REGION_IN_CLUSTER = "region1"; + + private final static String REAL_LOGICAL_CLUSTER_NAME = "logical_cluster_1"; + + // 共享集群 + private final static Integer REAL_LOGICAL_CLUSTER_MODE = 0; + + static { + REAL_BROKER_ID_SET.add(1); + REAL_BROKER_ID_SET.add(2); + } + + @Autowired + @InjectMocks + private ThrottleService throttleService; + + @Mock + private TopicThrottledMetricsDao topicThrottleDao; + + @Mock + private JmxService jmxService; + + + @BeforeMethod + public void init() { + MockitoAnnotations.initMocks(this); + } + + private TopicThrottledMetricsDO getTopicThrottledMetricsDO() { + TopicThrottledMetricsDO metricsDO = new TopicThrottledMetricsDO(); + metricsDO.setAppId(KAFKA_MANAGER_APP_ID); + metricsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + metricsDO.setTopicName(REAL_TOPIC_IN_ZK); + return metricsDO; + } + + private TopicThrottledMetrics getTopicThrottledMetrics() { + TopicThrottledMetrics metrics = new TopicThrottledMetrics(); + metrics.setClientType(KafkaClientEnum.PRODUCE_CLIENT); + metrics.setTopicName(REAL_TOPIC_IN_ZK); + metrics.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + return metrics; + } + + @Test + public void insertBatchTest() { + // dataList为空测试 + Assert.assertEquals(throttleService.insertBatch(new ArrayList<>()), 0); + + // 成功测试 + insertBatch2SuccessTest(); + } + private void insertBatch2SuccessTest() { + TopicThrottledMetricsDO metricsDO = getTopicThrottledMetricsDO(); + List doList = new ArrayList<>(); + doList.add(metricsDO); + + Mockito.when(topicThrottleDao.insertBatch(Mockito.anyList())).thenReturn(3); + Assert.assertTrue(throttleService.insertBatch(doList) > 0); + } + + @Test + public void getTopicThrottleFromDBTest() { + // 返回空集合测试 + getTopicThrottleFromDB2TopicThrottleDOListIsEmptyTest(); + + // 成功测试 + getTopicThrottleFromDB2SuccessTest(); + } + + private void getTopicThrottleFromDB2SuccessTest() { + TopicThrottledMetricsDO metricsDO = getTopicThrottledMetricsDO(); + List metricsDOList = new ArrayList<>(); + metricsDOList.add(metricsDO); + + Mockito.when(topicThrottleDao.getTopicThrottle(Mockito.anyLong(), Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(metricsDOList); + Assert.assertTrue(throttleService.getTopicThrottleFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, KAFKA_MANAGER_APP_ID, new Date(), new Date()).stream().allMatch(metricsDO1 -> + metricsDO1.getAppId().equals(metricsDO.getAppId()) && + metricsDO1.getClusterId().equals(metricsDO.getClusterId()) && + metricsDO1.getTopicName().equals(metricsDO.getTopicName()))); + + } + + private void getTopicThrottleFromDB2TopicThrottleDOListIsEmptyTest() { + Mockito.when(topicThrottleDao.getAppIdThrottle(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(new ArrayList<>()); + Assert.assertTrue(throttleService.getTopicThrottleFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, KAFKA_MANAGER_APP_ID, new Date(), new Date()).isEmpty()); + } + + @Test + public void getThrottledTopicsFromJmxTest() { + // 返回空集合测试 + getThrottledTopicsFromJmx2ReturnEmptyTest(); + + // 返回成功测试 + getThrottledTopicsFromJmx2SuccessTest(); + } + + private void getThrottledTopicsFromJmx2ReturnEmptyTest() { + Assert.assertTrue(throttleService.getThrottledTopicsFromJmx(REAL_CLUSTER_ID_IN_MYSQL, REAL_BROKER_ID_SET, new ArrayList<>()).isEmpty()); + } + + private void getThrottledTopicsFromJmx2SuccessTest() { + List clientList = new ArrayList<>(); + clientList.add(KafkaClientEnum.PRODUCE_CLIENT); + + Assert.assertTrue(throttleService.getThrottledTopicsFromJmx(REAL_CLUSTER_ID_IN_MYSQL, REAL_BROKER_ID_SET, clientList).isEmpty()); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredServiceTest.java new file mode 100644 index 00000000..47b3cc90 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicExpiredServiceTest.java @@ -0,0 +1,65 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO; +import com.xiaojukeji.kafka.manager.dao.TopicExpiredDao; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.List; + +/** + * @author wyc + * @date 2021/12/20 + */ +public class TopicExpiredServiceTest extends BaseTest { + + @Autowired + private TopicExpiredDao topicExpiredDao; + + @Autowired + private TopicExpiredService topicExpiredService; + + + private TopicExpiredDO getTopicExpiredDO() { + TopicExpiredDO topicExpiredDO = new TopicExpiredDO(); + topicExpiredDO.setClusterId(1L); + topicExpiredDO.setExpiredDay(30); + topicExpiredDO.setTopicName("topic_a"); + topicExpiredDO.setStatus(0); + + return topicExpiredDO; + } + + @Test + public void retainExpiredTopicTest() { + // 参数非法测试 + Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, "topic_a", -1), ResultStatus.PARAM_ILLEGAL); + + // Topic不存在测试 + Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, "topicNotExist", 40), ResultStatus.TOPIC_NOT_EXIST); + + // 成功测试 + // 过期Topic插入到topic_expired表中时,会先检查这个Topic是否在这个物理集群中,所以测试基于集群中建立了"topic_a"的topic + topicExpiredDao.replace(getTopicExpiredDO()); + Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, getTopicExpiredDO().getTopicName(), 40), ResultStatus.SUCCESS); + + } + + + @Test + public void deleteByNameTest() { + // 删除失败 + Assert.assertEquals(topicExpiredService.deleteByTopicName(1L, "notExistTopic"), 0); + + // 删除成功 + // 先在topic_expired表中插入数据,可以插入不存在的topic,因为这个删除只是从数据库中删除,删除的时候并没有检验topic是否存在于集群 + // 根据返回值判断是否删除成功 + TopicExpiredDO topicExpiredDO = getTopicExpiredDO(); + topicExpiredDO.setTopicName("test-topic"); + topicExpiredDao.replace(topicExpiredDO); + Assert.assertEquals(topicExpiredService.deleteByTopicName(getTopicExpiredDO().getClusterId(), "test-topic"), 1); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerServiceTest.java new file mode 100644 index 00000000..2da107c1 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicManagerServiceTest.java @@ -0,0 +1,752 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO; +import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.*; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO; +import com.xiaojukeji.kafka.manager.common.utils.ListUtils; +import com.xiaojukeji.kafka.manager.dao.TopicDao; +import com.xiaojukeji.kafka.manager.dao.TopicExpiredDao; +import com.xiaojukeji.kafka.manager.dao.TopicStatisticsDao; +import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; +import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * @author wyc + * @date 2021/12/21 + */ +public class TopicManagerServiceTest extends BaseTest { + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static String REAL_TOPIC_IN_ZK = "topic_a"; + + private final static String ADMIN_NAME_IN_MYSQL = "admin"; + + private final static String KAFKA_MANAGER_APP_NAME = "KM管理员"; + + private final static String KAFKA_MANAGER_APP_ID = "dkm_admin"; + + private final static Set REAL_BROKER_ID_SET = new HashSet<>(); + + private final static String REAL_REGION_IN_CLUSTER = "region1"; + + private final static String REAL_LOGICAL_CLUSTER_NAME = "logical_cluster_1"; + + // 共享集群 + private final static Integer REAL_LOGICAL_CLUSTER_MODE = 0; + + static { + REAL_BROKER_ID_SET.add(1); + REAL_BROKER_ID_SET.add(2); + } + + + @Autowired + @InjectMocks + private TopicManagerService topicManagerService; + + @Mock + private TopicDao topicDao; + + @Mock + private TopicStatisticsDao topicStatisticsDao; + + @Mock + private TopicExpiredDao topicExpiredDao; + + @Mock + private AppService appService; + + @Mock + private ClusterService clusterService; + + @Mock + private AuthorityService authorityService; + + @Mock + private ThrottleService throttleService; + + @Mock + private RegionService regionService; + + @Mock + private LogicalClusterMetadataManager logicalClusterMetadataManager; + + @BeforeMethod + public void init() { + MockitoAnnotations.initMocks(this); + } + + private TopicDO getTopicDO() { + TopicDO topicDO = new TopicDO(); + topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicDO.setTopicName("test_topic"); + topicDO.setAppId("appId"); + topicDO.setPeakBytesIn(100L); + return topicDO; + } + + private TopicDO getTopicDOInCluster() { + // 这个Topic是通过工单手动在物理集群中插入的 + TopicDO topicDO = new TopicDO(); + topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicDO.setTopicName(REAL_TOPIC_IN_ZK); + topicDO.setAppId(KAFKA_MANAGER_APP_ID); + topicDO.setPeakBytesIn(100L); + return topicDO; + } + + private TopicStatisticsDO getTopicStatisticsDO() { + // cluster_id, topic_name, offset_sum, max_avg_bytes_in, gmt_day + TopicStatisticsDO topicStatisticsDO = new TopicStatisticsDO(); + topicStatisticsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicStatisticsDO.setTopicName(REAL_TOPIC_IN_ZK); + topicStatisticsDO.setOffsetSum(1L); + topicStatisticsDO.setMaxAvgBytesIn(1.0); + topicStatisticsDO.setGmtDay("2020-03-30"); + return topicStatisticsDO; + } + + private TopicExpiredDO getTopicExpiredDO() { + TopicExpiredDO topicExpiredDO = new TopicExpiredDO(); + return topicExpiredDO; + } + + private AuthorityDO getAuthorityDO() { + AuthorityDO authorityDO = new AuthorityDO(); + authorityDO.setAccess(3); + authorityDO.setAppId(KAFKA_MANAGER_APP_ID); + authorityDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + authorityDO.setTopicName(REAL_TOPIC_IN_ZK); + return authorityDO; + } + + private TopicThrottledMetrics getTopicThrottledMetrics() { + TopicThrottledMetrics metrics = new TopicThrottledMetrics(); + metrics.setAppId(KAFKA_MANAGER_APP_ID); + metrics.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + metrics.setTopicName(REAL_TOPIC_IN_ZK); + metrics.setClientType(KafkaClientEnum.PRODUCE_CLIENT); + metrics.setBrokerIdSet(REAL_BROKER_ID_SET); + return metrics; + } + + private TopicAppData getTopicAppData() { + TopicAppData data = new TopicAppData(); + data.setAccess(3); + data.setAppId(KAFKA_MANAGER_APP_ID); + data.setAppName(KAFKA_MANAGER_APP_NAME); + data.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + data.setTopicName(REAL_TOPIC_IN_ZK); + data.setAppPrincipals(ADMIN_NAME_IN_MYSQL); + data.setConsumerQuota(15728640L); + data.setProduceQuota(15728640L); + return data; + } + + private ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + return clusterDO; + } + + private RegionDO getRegionDO() { + RegionDO regionDO = new RegionDO(); + regionDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + regionDO.setBrokerList(ListUtils.intList2String(new ArrayList<>(REAL_BROKER_ID_SET))); + regionDO.setName(REAL_REGION_IN_CLUSTER); + return regionDO; + } + + private RdTopicBasic getRdTopicBasic() { + RdTopicBasic rdTopicBasic = new RdTopicBasic(); + rdTopicBasic.setAppId(KAFKA_MANAGER_APP_ID); + rdTopicBasic.setAppName(KAFKA_MANAGER_APP_NAME); + List regionNameList = new ArrayList<>(); + regionNameList.add(REAL_REGION_IN_CLUSTER); + rdTopicBasic.setRegionNameList(regionNameList); + return rdTopicBasic; + } + + private TopicBusinessInfo getTopicBusinessInfo() { + TopicBusinessInfo info = new TopicBusinessInfo(); + info.setAppId(KAFKA_MANAGER_APP_ID); + info.setAppName(KAFKA_MANAGER_APP_NAME); + info.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + info.setPrincipals(ADMIN_NAME_IN_MYSQL); + info.setTopicName(REAL_TOPIC_IN_ZK); + + return info; + } + + private LogicalClusterDO getLogicalClusterDO() { + LogicalClusterDO logicalClusterDO = new LogicalClusterDO(); + logicalClusterDO.setName(REAL_LOGICAL_CLUSTER_NAME); + logicalClusterDO.setMode(REAL_LOGICAL_CLUSTER_MODE); + logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + logicalClusterDO.setAppId(KAFKA_MANAGER_APP_ID); + return logicalClusterDO; + } + + @Test + public void addTopicTest() { + // addTopic只是向数据库topic表中写数据 + // 成功测试 + Mockito.when(topicDao.insert(Mockito.any())).thenReturn(1); + Assert.assertEquals(topicManagerService.addTopic(getTopicDO()), 1); + + // 失败测试,再次插入相同的Topic + Mockito.when(topicDao.insert(Mockito.any())).thenReturn(0); + Assert.assertEquals(topicManagerService.addTopic(getTopicDO()), 0); + } + + + + @Test + public void deleteByTopicNameTest() { + // 删除也只是删除topic表中的数据 + // 删除失败测试,删除一个不存在的 + Mockito.when(topicDao.deleteByName(Mockito.anyLong(), Mockito.anyString())).thenReturn(0); + Assert.assertEquals(topicManagerService.deleteByTopicName(getTopicDO().getClusterId(), "notExistTopic"), 0); + + // 删除成功测试 + Mockito.when(topicDao.deleteByName(Mockito.anyLong(), Mockito.anyString())).thenReturn(1); + Assert.assertEquals(topicManagerService.deleteByTopicName(getTopicDO().getClusterId(), getTopicDO().getTopicName()), 1); + } + + + + @Test(description = "物理集群中不存在该topic时的删除操作") + public void modifyTopic2TopicNotExist() { + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(null); + Assert.assertEquals(topicManagerService.modifyTopic(getTopicDO().getClusterId(), "notExistTopic", null, null), ResultStatus.TOPIC_NOT_EXIST); + } + + @Test(description = "modifyTopic, 成功") + public void modifyTopic2Success() { + TopicDO topicDO = getTopicDOInCluster(); + // 因为会检查集群中是否存在这个Topic,因此直接用KM创建topic,用这个Topic测试 + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO); + Mockito.when(topicDao.updateByName(Mockito.any())).thenReturn(1); + Assert.assertEquals(topicManagerService.modifyTopic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, "更改过", "operator"), ResultStatus.SUCCESS); + } + + + + @Test(description = "modifyTopicByOp, 物理集群中不存在该topic") + public void modifyTopicByOp2TopicNotExistTest() { + Assert.assertEquals(topicManagerService.modifyTopicByOp(getTopicDOInCluster().getClusterId(), "notExistTopic", "dkm_admin", null, "admin"), ResultStatus.TOPIC_NOT_EXIST); + } + + @Test(description = "modifyTopicByOp, 物理集群中不存在传入的appId") + public void modifyTopicByOp2AppNotExistTest() { + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(null); + Assert.assertEquals(topicManagerService.modifyTopicByOp(getTopicDOInCluster().getClusterId(), getTopicDOInCluster().getTopicName(), "notExistAppId", null, "admin"), ResultStatus.APP_NOT_EXIST); + } + + @Test(description = "modifyTopicByOp, 成功测试") + public void modifyTopicByOp2SuccessTest() { + AppDO appDO = getAppDO(); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO); + Assert.assertEquals(topicManagerService.modifyTopicByOp(getTopicDOInCluster().getClusterId(), getTopicDOInCluster().getTopicName(), getTopicDOInCluster().getAppId(), "无", "admin"), ResultStatus.SUCCESS); + } + + + private void addAuthority2ClusterNotExistTest() { + AuthorityDO authorityDO = getAuthorityDO(); + AppDO appDO = getAppDO(); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO); + Mockito.when(logicalClusterMetadataManager.getPhysicalClusterId(Mockito.anyLong())).thenReturn(null); + Assert.assertEquals(topicManagerService.addAuthority(authorityDO), ResultStatus.CLUSTER_NOT_EXIST); + } + + + + @Test + public void listAllTest() { + // 准备mock返回数据 + List doList = new ArrayList<>(); + doList.add(getTopicDO()); + topicDao.insert(getTopicDO()); + Mockito.when(topicDao.listAll()).thenReturn(doList); + List topicDOS = topicManagerService.listAll(); + Assert.assertFalse(topicDOS.isEmpty()); + Assert.assertTrue(topicDOS.stream().allMatch(topicDO -> + topicDO.getClusterId().equals(getTopicDO().getClusterId()) && + topicDO.getAppId().equals(getTopicDO().getAppId()) && + topicDO.getTopicName().equals(getTopicDO().getTopicName()))); + } + + @Test + public void getByClusterIdFromCacheTest() { + // 返回空集合测试 + getByClusterIdFromCache2ReturnEmptyListTest(); + + // 返回成功测试 + getByClusterIdFromCache2SuccessTest(); + } + + private void getByClusterIdFromCache2ReturnEmptyListTest() { + Assert.assertTrue(topicManagerService.getByClusterIdFromCache(null).isEmpty()); + } + + private void getByClusterIdFromCache2SuccessTest() { + List doList = new ArrayList<>(); + doList.add(getTopicDO()); + Mockito.when(topicDao.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(doList); + Assert.assertEquals(topicManagerService.getByClusterIdFromCache(REAL_CLUSTER_ID_IN_MYSQL), doList); + } + + + @Test + public void getByClusterIdTest() { + // 返回空集合测试 + getByClusterId2ReturnEmptyListTest(); + + // 返回成功测试 + getByClusterId2SuccessTest(); + } + + private void getByClusterId2ReturnEmptyListTest() { + Assert.assertTrue(topicManagerService.getByClusterId(null).isEmpty()); + } + + private void getByClusterId2SuccessTest() { + List doList = new ArrayList<>(); + doList.add(getTopicDO()); + Mockito.when(topicDao.getByClusterId(Mockito.anyLong())).thenReturn(doList); + Assert.assertEquals(topicManagerService.getByClusterId(REAL_CLUSTER_ID_IN_MYSQL), doList); + } + + + + @Test + public void getByTopicNameTest() { + // 返回null测试 + getByTopicName2ReturnNullTest(); + + // 成功测试 + getByTopicName2SuccessTest(); + } + + private void getByTopicName2ReturnNullTest() { + Assert.assertNull(topicManagerService.getByTopicName(null, REAL_TOPIC_IN_ZK)); + Assert.assertNull(topicManagerService.getByTopicName(REAL_CLUSTER_ID_IN_MYSQL, null)); + } + + private void getByTopicName2SuccessTest() { + TopicDO topicDOInCluster = getTopicDOInCluster(); + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDOInCluster); + Assert.assertEquals(topicManagerService.getByTopicName(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK), topicDOInCluster); + } + + @Test + public void replaceTopicStatisticsTest() { + // 失败测试 + Mockito.when(topicStatisticsDao.replace(Mockito.any())).thenReturn(0); + Assert.assertEquals(topicManagerService.replaceTopicStatistics(new TopicStatisticsDO()), 0); + + // 成功测试 + Mockito.when(topicStatisticsDao.replace(Mockito.any())).thenReturn(1); + Assert.assertEquals(topicManagerService.replaceTopicStatistics(new TopicStatisticsDO()), 1); + } + + @Test + public void getTopicMaxAvgBytesInTest() { + // 返回空Map测试 + getTopicMaxAvgBytesIn2ReturnEmptyMapTest(); + + // 返回成功测试 + getTopicMaxAvgBytesIn2SuccessTest(); + } + + private void getTopicMaxAvgBytesIn2ReturnEmptyMapTest() { + List doList = new ArrayList<>(); + Mockito.when(topicStatisticsDao.getTopicStatisticData(Mockito.anyLong(), Mockito.any(), Mockito.anyDouble())).thenReturn(doList); + Assert.assertTrue(topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL,1, 1.0).size() == 0); + } + + private void getTopicMaxAvgBytesIn2SuccessTest() { + List doList = new ArrayList<>(); + TopicStatisticsDO topicStatisticsDO = getTopicStatisticsDO(); + doList.add(topicStatisticsDO); + Mockito.when(topicStatisticsDao.getTopicStatisticData(Mockito.anyLong(), Mockito.any(), Mockito.anyDouble())).thenReturn(doList); + + Map> expectMap = new HashMap<>(); + List list = new ArrayList<>(); + list.add(topicStatisticsDO.getMaxAvgBytesIn()); + expectMap.put(topicStatisticsDO.getTopicName(), list); + + Map> actualMap = topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL, 1, 1.0); + Assert.assertTrue(actualMap.keySet().stream().allMatch(key -> actualMap.get(key).equals(expectMap.get(key)))); + } + + @Test + public void getTopicMaxAvgBytesInTest2() { + // 返回空测试 + getTopicMaxAvgBytesInTest2NullTest(); + // 成功测试 + getTopicMaxAvgBytesInTest2SuccessTest(); + } + + private void getTopicMaxAvgBytesInTest2NullTest() { + Mockito.when(topicStatisticsDao.getTopicMaxAvgBytesIn(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(null); + Assert.assertEquals(topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, new Date(), new Date(), 1), null); + } + + private void getTopicMaxAvgBytesInTest2SuccessTest() { + Mockito.when(topicStatisticsDao.getTopicMaxAvgBytesIn(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(1.0); + Assert.assertEquals(topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, new Date(), new Date(), 1), 1.0); + } + + @Test + public void getByTopicAndDayTest() { + TopicStatisticsDO topicStatisticsDO = getTopicStatisticsDO(); + Mockito.when(topicStatisticsDao.getByTopicAndDay(Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).thenReturn(topicStatisticsDO); + Assert.assertEquals(topicManagerService.getByTopicAndDay(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, "2020-03-30"), topicStatisticsDO); + } + + @Test + public void getExpiredTopicsTest() { + Mockito.when(topicExpiredDao.getExpiredTopics(Mockito.anyInt())).thenReturn(new ArrayList<>()); + Assert.assertTrue(topicManagerService.getExpiredTopics(30).isEmpty()); + } + + + + + private MineTopicSummary getMineTopicSummary() { + MineTopicSummary summary = new MineTopicSummary(); + summary.setAppName(KAFKA_MANAGER_APP_NAME); + summary.setAccess(TopicAuthorityEnum.OWNER.getCode()); + summary.setPhysicalClusterId(1L); + summary.setTopicName(REAL_TOPIC_IN_ZK); + return summary; + } + + private TopicDO getRealTopicDO() { + TopicDO topicDO = new TopicDO(); + topicDO.setAppId(KAFKA_MANAGER_APP_ID); + topicDO.setTopicName(REAL_TOPIC_IN_ZK); + topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicDO.setDescription("topic介绍"); + topicDO.setAppId(KAFKA_MANAGER_APP_ID); + return topicDO; + } + + private AppDO getAppDO() { + AppDO appDO = new AppDO(); + appDO.setAppId(KAFKA_MANAGER_APP_ID); + appDO.setName(KAFKA_MANAGER_APP_NAME); + appDO.setPrincipals(ADMIN_NAME_IN_MYSQL); + appDO.setType(1); + return appDO; + } + + @Test + public void getMyTopicsTest() { + MineTopicSummary mineTopicSummary = getMineTopicSummary(); + List topicDOList = new ArrayList<>(); + TopicDO realTopicDO = getRealTopicDO(); + topicDOList.add(realTopicDO); + + List appDOList = new ArrayList<>(); + AppDO appDO = getAppDO(); + appDOList.add(appDO); + + Mockito.when(topicDao.listAll()).thenReturn(topicDOList); + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(realTopicDO); + Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(appDOList); + + Assert.assertTrue(topicManagerService.getMyTopics(ADMIN_NAME_IN_MYSQL).stream().allMatch(summary -> + summary.getAccess().equals(mineTopicSummary.getAccess()) && + summary.getAppName().equals(mineTopicSummary.getAppName()) && + summary.getTopicName().equals(mineTopicSummary.getTopicName()))); + } + + + @Test + public void getTopicsTest() { + // ClusterDO为空测试 + getTopic2clusterDOListIsEmptyTest(); + + // appList为空测试 + getTopic2AppListIsEmptyTest(); + + // 成功测试 + getTopic2SuccessTest(); + } + + + private TopicDTO getTopicDTO() { + TopicDTO topicDTO = new TopicDTO(); + topicDTO.setAppId(KAFKA_MANAGER_APP_ID); + topicDTO.setAppName(KAFKA_MANAGER_APP_NAME); + topicDTO.setAppPrincipals(ADMIN_NAME_IN_MYSQL); + topicDTO.setTopicName(REAL_TOPIC_IN_ZK); + return topicDTO; + } + + + private void getTopic2clusterDOListIsEmptyTest() { + Mockito.when(clusterService.listAll()).thenReturn(new ArrayList<>()); + Assert.assertTrue(topicManagerService.getTopics(ADMIN_NAME_IN_MYSQL).isEmpty()); + } + + private void getTopic2AppListIsEmptyTest() { + List clusterDOList = new ArrayList<>(); + clusterDOList.add(new ClusterDO()); + Mockito.when(clusterService.listAll()).thenReturn(clusterDOList); + Mockito.when(appService.listAll()).thenReturn(new ArrayList<>()); + Assert.assertTrue(topicManagerService.getTopics(ADMIN_NAME_IN_MYSQL).isEmpty()); + } + + private void getTopic2SuccessTest() { + List clusterDOList = new ArrayList<>(); + ClusterDO clusterDO = getClusterDO(); + clusterDOList.add(clusterDO); + + List appDOList = new ArrayList<>(); + AppDO appDO = getAppDO(); + appDOList.add(appDO); + + TopicDTO topicDTO = getTopicDTO(); + + LogicalClusterDO logicalClusterDO = getLogicalClusterDO(); + + Mockito.when(clusterService.listAll()).thenReturn(clusterDOList); + Mockito.when(appService.listAll()).thenReturn(appDOList); + Mockito.when(logicalClusterMetadataManager.getTopicLogicalCluster(Mockito.anyLong(), Mockito.anyString())).thenReturn(logicalClusterDO); + Assert.assertTrue(topicManagerService.getTopics(ADMIN_NAME_IN_MYSQL).stream().allMatch(dto -> + dto.getAppId().equals(topicDTO.getAppId()) && + dto.getAppName().equals(topicDTO.getAppName()) && + dto.getTopicName().equals(topicDTO.getTopicName()))); + + } + + @Test + public void getTopicAuthorizedAppsTest() { + // Topic不存在测试 + getTopicAuthorizedApps2TopicNotExistTest(); + + // 没有权限测试 + getTopicAuthorizedApps2NoAuthorityTest(); + + // 成功测试 + getTopicAuthorizedApps2successTest(); + + } + + private void getTopicAuthorizedApps2TopicNotExistTest() { + Assert.assertTrue(topicManagerService.getTopicAuthorizedApps(-1L, "notExistTopic").isEmpty()); + } + + private void getTopicAuthorizedApps2NoAuthorityTest() { + Mockito.when(authorityService.getAuthorityByTopic(Mockito.anyLong(), Mockito.anyString())).thenReturn(new ArrayList<>()); + Assert.assertTrue(topicManagerService.getTopicAuthorizedApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).isEmpty()); + } + + private void getTopicAuthorizedApps2successTest() { + AuthorityDO authorityDO = getAuthorityDO(); + List authorityDOList = new ArrayList<>(); + authorityDOList.add(authorityDO); + Mockito.when(authorityService.getAuthorityByTopic(Mockito.anyLong(), Mockito.anyString())).thenReturn(authorityDOList); + + List topicThrottledMetricsList = new ArrayList<>(); + TopicThrottledMetrics topicThrottledMetrics = getTopicThrottledMetrics(); + topicThrottledMetricsList.add(topicThrottledMetrics); + Mockito.when(throttleService.getThrottledTopicsFromJmx(Mockito.anyLong(), Mockito.anySet(), Mockito.anyList())).thenReturn(topicThrottledMetricsList); + + AppDO appDO = getAppDO(); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO); + + TopicAppData topicAppData = getTopicAppData(); + + Assert.assertTrue(topicManagerService.getTopicAuthorizedApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).stream().allMatch(data -> + data.getAccess().equals(topicAppData.getAccess()) && + data.getAppId().equals(topicAppData.getAppId()) && + data.getTopicName().equals(topicAppData.getTopicName()) && + data.getAppName().equals(topicAppData.getAppName()) && + data.getAppPrincipals().equals(topicAppData.getAppPrincipals()))); + } + + + @Test + public void getTopicMineAppsTest() { + // topic不存在测试 + getTopicMineApps2TopicNotExistTest(); + + // appDOList为空测试 + getTopicMineApps2AppDOListIsEmptyTest(); + + // 成功测试 + getTopicMineApps2Success(); + } + + private void getTopicMineApps2TopicNotExistTest() { + Assert.assertTrue(topicManagerService.getTopicMineApps(-1L, "notExistTopic", ADMIN_NAME_IN_MYSQL).isEmpty()); + } + + private void getTopicMineApps2AppDOListIsEmptyTest() { + Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(new ArrayList<>()); + Assert.assertTrue(topicManagerService.getTopicMineApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, ADMIN_NAME_IN_MYSQL).isEmpty()); + } + + private void getTopicMineApps2Success() { + AppDO appDO = getAppDO(); + List appDOList = new ArrayList<>(); + appDOList.add(appDO); + Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(appDOList); + + AuthorityDO authorityDO = getAuthorityDO(); + List authorityDOList = new ArrayList<>(); + authorityDOList.add(authorityDO); + Mockito.when(authorityService.getAuthorityByTopic(Mockito.anyLong(), Mockito.anyString())).thenReturn(authorityDOList); + + List topicThrottledMetricsList = new ArrayList<>(); + TopicThrottledMetrics topicThrottledMetrics = getTopicThrottledMetrics(); + topicThrottledMetricsList.add(topicThrottledMetrics); + Mockito.when(throttleService.getThrottledTopicsFromJmx(Mockito.anyLong(), Mockito.anySet(), Mockito.anyList())).thenReturn(topicThrottledMetricsList); + + System.out.println(topicManagerService.getTopicMineApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, ADMIN_NAME_IN_MYSQL)); + TopicAppData topicAppData = getTopicAppData(); + Assert.assertTrue(topicManagerService.getTopicMineApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, ADMIN_NAME_IN_MYSQL).stream().allMatch(data -> + data.getAppName().equals(topicAppData.getAppName()) && + data.getTopicName().equals(topicAppData.getTopicName()) && + data.getConsumerQuota().equals(topicAppData.getConsumerQuota()))); + } + + + @Test + public void getRdTopicBasicTest() { + // 集群不存在测试 + getRdTopicBasic2ClusterNotExistTest(); + + // 成功测试 + getRdTopicBasic2SuccessTest(); + } + + private void getRdTopicBasic2ClusterNotExistTest() { + Mockito.when(clusterService.getById(Mockito.anyLong())).thenReturn(null); + Assert.assertEquals(topicManagerService.getRdTopicBasic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).toString(), Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST).toString()); + } + + private void getRdTopicBasic2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + Mockito.when(clusterService.getById(REAL_CLUSTER_ID_IN_MYSQL)).thenReturn(clusterDO); + + RegionDO regionDO = getRegionDO(); + List regionDOList = new ArrayList<>(); + regionDOList.add(regionDO); + Mockito.when(regionService.getRegionListByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(regionDOList); + + AppDO appDO = getAppDO(); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO); + + TopicDO topicDO = getTopicDOInCluster(); + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO); + + RdTopicBasic actualResult = topicManagerService.getRdTopicBasic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).getData(); + RdTopicBasic expectedResult = getRdTopicBasic(); + Assert.assertNotNull(actualResult); + Assert.assertTrue(actualResult.getAppId().equals(expectedResult.getAppId()) && + actualResult.getAppName().equals(expectedResult.getAppName()) && + actualResult.getRegionNameList().equals(expectedResult.getRegionNameList())); + + } + + @Test + public void getTopicBusinessInfoTest() { + // TopicDO为null测试 + getRdTopicBasic2SuccessTest(); + + // AppDO为null测试 + getTopicBusinessInfo2AppDOIsNullTest(); + + // 成功测试 + getTopicBusinessInfo2SuccessTest(); + } + + private void getTopicBusinessInfo2ReturnNullTest() { + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(null); + Assert.assertNull(topicManagerService.getTopicBusinessInfo(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK)); + } + + private void getTopicBusinessInfo2AppDOIsNullTest() { + TopicDO topicDO = getTopicDOInCluster(); + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(null); + + TopicBusinessInfo expected = getTopicBusinessInfo(); + expected.setAppId(null); + expected.setAppName(null); + expected.setPrincipals(null); + Assert.assertEquals(topicManagerService.getTopicBusinessInfo(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).toString(), expected.toString()); + } + + private void getTopicBusinessInfo2SuccessTest() { + TopicDO topicDO = getTopicDOInCluster(); + Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO); + + AppDO appDO = getAppDO(); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO); + + TopicBusinessInfo expected = getTopicBusinessInfo(); + Assert.assertEquals(topicManagerService.getTopicBusinessInfo(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).toString(), expected.toString()); + } + + @Test + public void getTopicStatisticTest() { + TopicStatisticsDO topicStatisticsDO = getTopicStatisticsDO(); + List topicStatisticsDOList = new ArrayList<>(); + topicStatisticsDOList.add(topicStatisticsDO); + + Mockito.when(topicStatisticsDao.getTopicStatistic(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(topicStatisticsDOList); + Assert.assertTrue(topicManagerService.getTopicStatistic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, new Date(), new Date()).stream().allMatch(statisticsDO -> + statisticsDO.getClusterId().equals(topicStatisticsDO.getClusterId()) && + statisticsDO.getMaxAvgBytesIn().equals(topicStatisticsDO.getMaxAvgBytesIn()) && + statisticsDO.getOffsetSum().equals(topicStatisticsDO.getOffsetSum()))); + } + + @Test + public void addAuthorityTest() { + // app不存在测试 + addAuthority2AppNotExistTest(); + + // cluster不存在测试 +// addAuthority2ClusterNotExistTest(); + + } + + private void addAuthority2AppNotExistTest() { + AuthorityDO authorityDO = getAuthorityDO(); +// Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(new ArrayList<>()); + Assert.assertEquals(topicManagerService.addAuthority(authorityDO), ResultStatus.APP_NOT_EXIST); + } + + + + + +}