ConfigService,OperateRecordService,RegionService,ThrottleService,TopicExpiredService,TopicManagerService接口下的单元测试方法

This commit is contained in:
didi
2021-12-27 14:55:35 +08:00
parent 21904a8609
commit ecf6e8f664
6 changed files with 2013 additions and 2 deletions

View File

@@ -1,11 +1,460 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.alibaba.fastjson.JSON;
import com.xiaojukeji.kafka.manager.common.constant.ConfigConstant;
import com.xiaojukeji.kafka.manager.common.constant.TopicCreationConstant;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.CreateTopicElemConfig;
import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.TopicExpiredConfig;
import com.xiaojukeji.kafka.manager.common.entity.dto.config.ConfigDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ConfigDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
/**
* @author xuguang
* @Date 2021/12/6
* @author wyc
* @date 2021/12/9
*/
public class ConfigServiceTest extends BaseTest {
@Autowired
private ConfigService configService;
@DataProvider(name = "configDTO")
public Object[][] provideConfigDO() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey("key1");
dto.setConfigValue("value1");
dto.setConfigDescription("test");
return new Object[][] {{dto}};
}
public ConfigDTO getConfigDTO() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey("key1");
dto.setConfigValue("value1");
dto.setConfigDescription("test");
return dto;
}
@Test(dataProvider = "configDTO")
public void insertTest(ConfigDTO dto) {
// 插入时MySQL错误
insert2MySQLErrorTest(dto);
// 插入成功测试
insert2SuccessTest(dto);
// 插入时,资源已存在测试
insert2ResourceExistedTest(dto);
}
private void insert2SuccessTest(ConfigDTO dto) {
dto.setConfigKey("key1");
ResultStatus result = configService.insert(dto);
Assert.assertEquals(result, ResultStatus.SUCCESS);
}
private void insert2ResourceExistedTest(ConfigDTO dto2) {
ResultStatus result2 = configService.insert(dto2);
Assert.assertEquals(result2, ResultStatus.RESOURCE_ALREADY_EXISTED);
}
private void insert2MySQLErrorTest(ConfigDTO dto) {
dto.setConfigKey(null);
ResultStatus result = configService.insert(dto);
Assert.assertEquals(result, ResultStatus.MYSQL_ERROR);
}
@Test
public void deleteByKeyTest() {
// deleteByKey, key时null
deleteByKey2NullTest();
// deleteByKey, 配置不存在测试
deleteByKey2ConfigNotExistTest();
// deleteByKey, 成功测试
deleteByKey2SuccessTest();
}
private void deleteByKey2NullTest() {
ResultStatus result = configService.deleteByKey(null);
Assert.assertEquals(result, ResultStatus.PARAM_ILLEGAL);
}
private void deleteByKey2SuccessTest() {
ConfigDTO dto = getConfigDTO();
ResultStatus insertResult = configService.insert(dto);
Assert.assertEquals(insertResult, ResultStatus.SUCCESS);
ResultStatus deleteResult = configService.deleteByKey(dto.getConfigKey());
Assert.assertEquals(deleteResult, ResultStatus.SUCCESS);
}
private void deleteByKey2ConfigNotExistTest() {
ResultStatus result = configService.deleteByKey("key");
Assert.assertEquals(result, ResultStatus.CONFIG_NOT_EXIST);
}
@Test(dataProvider = "configDTO")
public void updateByKeyTest(ConfigDTO dto) {
configService.insert(dto);
// updateByKey, 成功测试
updateByKey2SuccessTest(dto);
// updateByKey, 配置不存在测试
updateByKey2ConfigNotExistTest(dto);
}
private void updateByKey2SuccessTest(ConfigDTO dto) {
dto.setConfigValue("newValue");
ResultStatus updateResult = configService.updateByKey(dto);
Assert.assertEquals(updateResult, ResultStatus.SUCCESS);
}
@Test(dataProvider = "configDTO", description = "updateByKey, 配置不存在测试")
private void updateByKey2ConfigNotExistTest(ConfigDTO dto) {
dto.setConfigKey("newKey");
ResultStatus updateResult = configService.updateByKey(dto);
Assert.assertEquals(updateResult, ResultStatus.CONFIG_NOT_EXIST);
}
// @Test(dataProvider = "configDTO", description = "updateByKey, MySQL_ERROR测试")
// public void updateByKey2MySQLErrorTest(ConfigDTO dto) {
// dto.setConfigKey(null);
// ResultStatus updateResult = configService.updateByKey(dto);
// Assert.assertEquals(updateResult, ResultStatus.CONFIG_NOT_EXIST);
// }
@Test(dataProvider = "configDTO")
public void updateByKeyTest2(ConfigDTO dto) {
configService.insert(dto);
// updateByKey重载方法成功测试
updateByKey2SuccessTest1(dto);
// updateByKey重载方法资源不存在测试
updateByKey2ConfigNotExistTest1(dto);
}
private void updateByKey2SuccessTest1(ConfigDTO dto) {
String key = dto.getConfigKey();
String value = "newValue";
Assert.assertEquals(configService.updateByKey(key, value), ResultStatus.SUCCESS);
}
private void updateByKey2ConfigNotExistTest1(ConfigDTO dto) {
Assert.assertEquals(configService.updateByKey("key2", "newValue"), ResultStatus.CONFIG_NOT_EXIST);
}
@Test(dataProvider = "configDTO")
public void getByKeyTest(ConfigDTO dto) {
configService.insert(dto);
// getByKey, 成功测试
getByKey2SuccessTest(dto);
// getByKey, 获取失败测试
getByKey2NullTest();
}
private void getByKey2SuccessTest(ConfigDTO dto) {
ConfigDO result = configService.getByKey(dto.getConfigKey());
Assert.assertNotNull(result);
Assert.assertTrue(result.getConfigKey().equals(dto.getConfigKey()) &&
result.getConfigValue().equals(dto.getConfigValue()) &&
result.getConfigDescription().equals(dto.getConfigDescription()));
}
private void getByKey2NullTest() {
Assert.assertNull(configService.getByKey("key2"));
}
@Test(dataProvider = "configDTO")
public void getByKeyTest2(ConfigDTO dto) {
// 需要用到TopicExpiredConfig类
TopicExpiredConfig config = getTopicExpiredConfig();
dto.setConfigValue(JSON.toJSONString(config));
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
// getByKey, 成功测试
getByKey2SuccessTest1(dto);
// getByKey, 返回null测试
getByKey2NullTest1(dto);
}
private TopicExpiredConfig getTopicExpiredConfig() {
TopicExpiredConfig config = new TopicExpiredConfig();
List<Long> list = new ArrayList<>();
list.add(1L);
list.add(2L);
config.setIgnoreClusterIdList(list);
return config;
}
private void getByKey2SuccessTest1(ConfigDTO dto) {
TopicExpiredConfig result = configService.getByKey(dto.getConfigKey(), TopicExpiredConfig.class);
Assert.assertEquals(result.toString(), getTopicExpiredConfig().toString());
}
private void getByKey2NullTest1(ConfigDTO dto) {
Assert.assertNull(configService.getByKey("key", TopicExpiredConfig.class));
}
@Test(dataProvider = "configDTO")
public void getArrayByKeyTest(ConfigDTO dto) {
dto.setConfigValue(JSON.toJSONString(getStringArray()));
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
// getArrayByKey 成功测试
getArrayByKey2SuccessTest(dto);
// getArrayByKey 返回null测试
getArrayByKey2NullTest();
}
private List<String> getStringArray() {
List<String> list = new ArrayList<>();
list.add("value1");
list.add("value2");
list.add("value3");
return list;
}
private void getArrayByKey2SuccessTest(ConfigDTO dto) {
List<String> result = configService.getArrayByKey(dto.getConfigKey(), String.class);
Assert.assertEquals(result, getStringArray());
}
private void getArrayByKey2NullTest() {
Assert.assertNull(configService.getArrayByKey(null, String.class));
}
@Test(dataProvider = "configDTO", description = "getLongValue, 成功测试")
public void getLongValue2SuccessTest(ConfigDTO dto) {
dto.setConfigValue("100");
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
Assert.assertEquals(configService.getLongValue(dto.getConfigKey(), 0L), Long.valueOf(dto.getConfigValue()));
}
@Test(description = "getLongValue, 不存在key返回默认值测试")
public void getLongValue2NotExistTest() {
Assert.assertEquals(configService.getLongValue("key", 100L), Long.valueOf(100L));
}
@Test(dataProvider = "configDTO", description = "getLongValue, 存在key但是value是null")
public void getLongValue2ValueIsNull(ConfigDTO dto) {
dto.setConfigValue(null);
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
Assert.assertEquals(configService.getLongValue(dto.getConfigKey(), 100L), Long.valueOf(100L));
}
@Test(dataProvider = "configDTO", description = "listAll, 成功测试")
public void listAll2SuccessTest(ConfigDTO dto) {
Assert.assertEquals(configService.insert(dto), ResultStatus.SUCCESS);
List<ConfigDO> result = configService.listAll();
Assert.assertNotNull(result);
List<ConfigDTO> list = new ArrayList<>();
list.add(dto);
// 判断key字段是否相同
Assert.assertEquals(result.stream().map(ConfigDO::getConfigKey).collect(Collectors.toList()),
list.stream().map(ConfigDTO::getConfigKey).collect(Collectors.toList()));
Assert.assertEquals(result.stream().map(ConfigDO::getConfigValue).collect(Collectors.toList()),
list.stream().map(ConfigDTO::getConfigValue).collect(Collectors.toList()));
}
public CreateTopicConfig getCreateTopicConfig() {
return new CreateTopicConfig();
}
public ConfigDTO getConfigDTO1() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey(TopicCreationConstant.INNER_CREATE_TOPIC_CONFIG_KEY);
dto.setConfigValue(JSON.toJSONString(getCreateTopicConfig()));
dto.setConfigDescription("test");
return dto;
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, config表中不存在INNER_CREATE_TOPIC_CONFIG_KEY" +
"对应的记录,返回默认值测试")
public void getAutoPassedTopicApplyOrderNumPerTask2NotExistTest() {
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK);
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中记录的maxPassedOrderNumPerTask属性为null测试")
public void getAutoPassedTopicApplyOrderNumPerTask2NullTest() {
configService.insert(getConfigDTO1());
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.DEFAULT_MAX_PASSED_ORDER_NUM_PER_TASK);
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中记录的maxPassedOrderNumPerTask" +
"比TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK大时测试")
public void getAutoPassedTopicApplyOrderNumPerTask2BiggerMaxTest() {
ConfigDTO configDTO = getConfigDTO1();
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
createTopicConfig.setMaxPassedOrderNumPerTask(TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK + 10);
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK);
}
@Test(description = "getAutoPassedTopicApplyOrderNumPerTask, 查到的记录中记录的maxPassedOrderNumPerTask" +
"比TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK小时测试")
public void getAutoPassedTopicApplyOrderNumPerTask2SmallerMaxTest() {
ConfigDTO configDTO = getConfigDTO1();
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
int val = TopicCreationConstant.MAX_PASSED_ORDER_NUM_PER_TASK - 10;
createTopicConfig.setMaxPassedOrderNumPerTask(val);
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getAutoPassedTopicApplyOrderNumPerTask(), Integer.valueOf(val));
}
public CreateTopicElemConfig getCreateTopicElemConfig(Long clusterId) {
CreateTopicElemConfig config = new CreateTopicElemConfig();
config.setClusterId(clusterId);
config.setBrokerIdList(new ArrayList<>());
config.setRegionIdList(new ArrayList<>());
config.setPartitionNum(TopicCreationConstant.DEFAULT_PARTITION_NUM);
config.setReplicaNum(TopicCreationConstant.DEFAULT_REPLICA);
config.setRetentionTimeUnitHour(TopicCreationConstant.DEFAULT_RETENTION_TIME_UNIT_HOUR);
config.setAutoExecMaxPeakBytesInUnitB(TopicCreationConstant.AUTO_EXEC_MAX_BYTES_IN_UNIT_B);
return config;
}
@Test(description = "getCreateTopicConfig, config表中不存在key时测试")
public void getCreateTopicConfig2NotExistKeyTest() {
CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(10L);
Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), createTopicElemConfig.toString());
}
@Test(description = "getCreateTopicConfig, value中存在和clusterId一致的记录")
public void getCreateTopicConfig2ExistTest() {
CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(10L);
createTopicElemConfig.setReplicaNum(4);
List<CreateTopicElemConfig> list = new ArrayList<>();
list.add(createTopicElemConfig);
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
createTopicConfig.setConfigList(list);
ConfigDTO configDTO = getConfigDTO1();
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), createTopicElemConfig.toString());
}
@Test(description = "getCreateTopicConfig, value中不存在和clusterId一致的记录")
public void getCreateTopicConfig2NotExitConfigEleTest() {
CreateTopicElemConfig createTopicElemConfig = getCreateTopicElemConfig(11L);
createTopicElemConfig.setReplicaNum(4);
List<CreateTopicElemConfig> list = new ArrayList<>();
list.add(createTopicElemConfig);
CreateTopicConfig createTopicConfig = getCreateTopicConfig();
createTopicConfig.setConfigList(list);
ConfigDTO configDTO = getConfigDTO1();
configDTO.setConfigValue(JSON.toJSONString(createTopicConfig));
configService.insert(configDTO);
Assert.assertEquals(configService.getCreateTopicConfig(10L, "systemCode").toString(), getCreateTopicElemConfig(10L).toString());
}
public ConfigDTO getConfigDTO2() {
ConfigDTO dto = new ConfigDTO();
dto.setConfigKey(ConfigConstant.KAFKA_CLUSTER_DO_CONFIG_KEY);
dto.setConfigDescription("test");
return dto;
}
public ConfigDO getConfigDO() {
return new ConfigDO();
}
@Test(description = "getClusterDO, config表中不存在ConfigConstant.KAFKA_CLUSTER_DO_CONFIG_KEY这个key")
public void getClusterDO2NotExistKeyTest() {
Assert.assertNull(configService.getClusterDO(10L));
}
@Test(description = "getClusterDO, config表中key对应的value没法解析成ConfigDO测试")
public void getClusterDO2ParseFailTest() {
ConfigDTO configDTO2 = getConfigDTO2();
configDTO2.setConfigValue("value");
configService.insert(configDTO2);
Assert.assertNull(configService.getClusterDO(10L));
}
public List<ClusterDO> getClusterDOList() {
ClusterDO clusterDO1 = new ClusterDO();
clusterDO1.setId(10L);
clusterDO1.setClusterName("test1");
ClusterDO clusterDO2 = new ClusterDO();
clusterDO2.setId(20L);
clusterDO2.setClusterName("test2");
List<ClusterDO> list = new ArrayList<>();
list.add(clusterDO1);
list.add(clusterDO2);
return list;
}
@Test(description = "getClusterDO, 成功查到测试")
public void getClusterDO2SuccessTest() {
ConfigDTO configDTO2 = getConfigDTO2();
List<ClusterDO> clusterDOList = getClusterDOList();
configDTO2.setConfigValue(JSON.toJSONString(clusterDOList));
configService.insert(configDTO2);
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(20L);
clusterDO.setClusterName("test2");
Assert.assertEquals(configService.getClusterDO(20L), clusterDO);
}
}

View File

@@ -0,0 +1,149 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.ModuleEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.OperateEnum;
import com.xiaojukeji.kafka.manager.common.entity.dto.rd.OperateRecordDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.OperateRecordDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* @author wyc
* @date 2021/12/8
*/
public class OperateRecordServiceTest extends BaseTest {
@Autowired
private OperateRecordService operateRecordService;
@DataProvider(name = "operateRecordDO")
public Object[][] provideOperateRecordDO() {
OperateRecordDO operateRecordDO = new OperateRecordDO();
operateRecordDO.setId(3L);
// 0:topic, 1:应用, 2:配额, 3:权限, 4:集群, 5:分区, 6:Gateway配置, -1:未知
operateRecordDO.setModuleId(ModuleEnum.CLUSTER.getCode());
// 0:新增, 1:删除, 2:修改
operateRecordDO.setOperateId(OperateEnum.ADD.getCode());
// topic名称、app名称
operateRecordDO.setResource("testOpRecord");
operateRecordDO.setContent("testContent");
operateRecordDO.setOperator("admin");
return new Object[][] {{operateRecordDO}};
}
private OperateRecordDTO getOperateRecordDTO() {
OperateRecordDTO dto = new OperateRecordDTO();
dto.setModuleId(ModuleEnum.CLUSTER.getCode());
dto.setOperateId(OperateEnum.ADD.getCode());
dto.setOperator("admin");
return dto;
}
@Test(dataProvider = "operateRecordDO", description = "插入操作记录成功测试")
public void insert2SuccessTest(OperateRecordDO operateRecordDO) {
int result = operateRecordService.insert(operateRecordDO);
Assert.assertEquals(result, 1);
}
// @Test(dataProvider = "operateRecordDO", description = "插入操作记录失败测试")
// public void insert2FailureTest(OperateRecordDO operateRecordDO) {
// operateRecordDO.setResource(null);
// int result = operateRecordService.insert(operateRecordDO);
// Assert.assertEquals(result, 0);
// }
@Test(description = "插入的重载方法操作成功测试")
public void insert2SuccessTest1() {
Map<String, String> content = new HashMap<>();
content.put("key", "value");
int result = operateRecordService.insert("admin", ModuleEnum.CLUSTER, "testOpRecord", OperateEnum.ADD, content);
Assert.assertEquals(result, 1);
}
// @Test(description = "插入的重载方法操作失败测试")
// public void insert2FailureTest1() {
// Map<String, String> content = new HashMap<>();
// content.put("key", "value");
// int result = operateRecordService.insert(null, ModuleEnum.CLUSTER, "testOpRecord", OperateEnum.ADD, content);
// Assert.assertEquals(result, 0);
// }
@Test(dataProvider = "operateRecordDO")
public void queryByConditionTest(OperateRecordDO operateRecordDO) {
operateRecordService.insert(operateRecordDO);
// endTime和startTime都是null
queryByConditionTest3(operateRecordDO);
// startTime是null
queryByConditionTest1(operateRecordDO);
// endTime是null
queryByConditionTest2(operateRecordDO);
// endTime和startTime都不是null
queryByConditionTest4(operateRecordDO);
}
private void queryByConditionTest1(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
dto.setEndTime(new Date().getTime());
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertFalse(queryResult.isEmpty());
// 判断查询得到的OperateRecordDO中日期是否符合要求
Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 ->
operateRecordDO1.getCreateTime().after(new Date(0L)) &&
operateRecordDO1.getCreateTime().before(new Date()) &&
operateRecordDO1.getModuleId().equals(dto.getModuleId()) &&
operateRecordDO1.getOperateId().equals(dto.getOperateId()) &&
operateRecordDO1.getOperator().equals(dto.getOperator())));
}
private void queryByConditionTest2(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
dto.setStartTime(new Date().getTime());
// 查询的是create_time >= startTime, 因为创建时间在当前时间之前,因此查到的数据是空的
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertTrue(queryResult.isEmpty());
}
private void queryByConditionTest3(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertFalse(queryResult.isEmpty());
Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 ->
operateRecordDO1.getCreateTime().after(new Date(0L)) &&
operateRecordDO1.getCreateTime().before(new Date()) &&
operateRecordDO1.getModuleId().equals(dto.getModuleId()) &&
operateRecordDO1.getOperateId().equals(dto.getOperateId()) &&
operateRecordDO1.getOperator().equals(dto.getOperator())));
}
private void queryByConditionTest4(OperateRecordDO operateRecordDO) {
OperateRecordDTO dto = getOperateRecordDTO();
dto.setStartTime(0L);
dto.setEndTime(1649036393371L);
List<OperateRecordDO> queryResult = operateRecordService.queryByCondition(dto);
Assert.assertFalse(queryResult.isEmpty());
Assert.assertTrue(queryResult.stream().allMatch(operateRecordDO1 ->
operateRecordDO1.getCreateTime().after(new Date(0L)) &&
operateRecordDO1.getCreateTime().before(new Date()) &&
operateRecordDO1.getModuleId().equals(dto.getModuleId()) &&
operateRecordDO1.getOperateId().equals(dto.getOperateId()) &&
operateRecordDO1.getOperator().equals(dto.getOperator())));
}
}

View File

@@ -0,0 +1,452 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.LogicalClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.RegionDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.*;
import java.util.stream.Collectors;
/**
* @author wyc
* @date 2021/12/8
*/
public class RegionServiceTest extends BaseTest{
@Autowired
private RegionService regionService;
@DataProvider(name = "regionDO")
public Object[][] provideRegionDO() {
RegionDO regionDO = new RegionDO();
regionDO.setStatus(0);
regionDO.setName("region1");
// 物理集群id
regionDO.setClusterId(1L);
regionDO.setDescription("test");
List<Integer> brokerIdList = new ArrayList<>();
brokerIdList.add(1);
brokerIdList.add(2);
regionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
return new Object[][] {{regionDO}};
}
@Test(description = "creatRegion, 参数为null测试")
public void createRegion2ParamIllegalTest() {
Assert.assertEquals(regionService.createRegion(null), ResultStatus.PARAM_ILLEGAL);
}
@Test(dataProvider = "regionDO", description = "createRegion, 成功测试")
public void createRegion2SuccessTest(RegionDO regionDO) {
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
}
@Test(dataProvider = "regionDO", description = "createRegion, clusterId为空测试")
public void createRegion2ExistBrokerIdAlreadyInRegionTest1(RegionDO regionDO) {
regionDO.setClusterId(null);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_USED);
}
@Test(dataProvider = "regionDO", description = "createRegion, 创建时传入的brokerList中有被使用过的")
public void createRegion2ExistBrokerIdAlreadyInRegionTest2(RegionDO regionDO) {
// 首先创建一个Region, 使用1,2broker
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 再创建一个Region, 使用1,3broker
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(1);
newBrokerIdList.add(3);
regionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_USED);
}
@Test(dataProvider = "regionDO", description = "createRegion, 创建时region使用到的broker挂掉了")
public void createRegion2BrokerNotExistTest(RegionDO regionDO) {
// 传入一个不存在的物理集群检测时会认为该集群存活的broker个数为0
regionDO.setClusterId(5L);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.BROKER_NOT_EXIST);
}
@Test(dataProvider = "regionDO", description = "createRegion, 创建时regionName重复")
public void createRegion2ResourceAlreadyExistTest(RegionDO regionDO) {
// 先插入一个
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 插入同名Region注意brokerList需要保持不一样不然会返回RESOURCE_ALREADY_USED
List<Integer> brokerIdList = new ArrayList<>();
brokerIdList.add(3);
regionDO.setBrokerList(ListUtils.intList2String(brokerIdList));
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.RESOURCE_ALREADY_EXISTED);
}
@Test(dataProvider = "regionDO")
public void deleteByIdTest(RegionDO regionDO) {
// 参数非法测试
deleteById2ParamIllegalTest(regionDO);
// 资源不存在测试
deleteById2ResourceNotExistTest(regionDO);
// 删除成功测试
deleteById2SuccessTest(regionDO);
}
private void deleteById2ParamIllegalTest(RegionDO regionDO) {
Assert.assertEquals(regionService.deleteById(null), ResultStatus.PARAM_ILLEGAL);
}
private void deleteById2ResourceNotExistTest(RegionDO regionDO) {
Assert.assertEquals(regionService.deleteById(10L), ResultStatus.RESOURCE_NOT_EXIST);
}
private void deleteById2SuccessTest(RegionDO regionDO) {
regionDO.setId(1L);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 插入时xml文件中没用到idid交给数据库自增因此需要先查出Region的id再根据id删除
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO region = regionDOList.get(0);
Assert.assertEquals(regionService.deleteById(region.getId()), ResultStatus.SUCCESS);
}
@Test(dataProvider = "regionDO", description = "updateRegion, 参数非法测试")
public void updateRegion2ParamIllegalTest1(RegionDO regionDO) {
Assert.assertEquals(regionService.updateRegion(null), ResultStatus.PARAM_ILLEGAL);
Assert.assertEquals(regionService.updateRegion(regionDO), ResultStatus.PARAM_ILLEGAL);
}
@Test(dataProvider = "regionDO", description = "updateRegion, 资源不存在测试")
public void updateRegion2ResourceNotExistTest1(RegionDO regionDO) {
// 不插入Region直接更新
regionDO.setId(1L);
Assert.assertEquals(regionService.updateRegion(regionDO), ResultStatus.RESOURCE_NOT_EXIST);
}
@Test(dataProvider = "regionDO", description = "updateRegion, brokerList未改变成功测试")
public void updateRegion2SuccessWithBrokerListNotChangeTest1(RegionDO regionDO) {
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改一些参数后作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
newRegionDO.setStatus(1);
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.SUCCESS);
}
@Test(dataProvider = "regionDO", description = "updateRegion, 传入的broker已经被使用测试")
public void updateRegion2ResourceAlreadyUsedTest1(RegionDO regionDO) {
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改brokerList后作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(1);
newBrokerIdList.add(3);
// 更新Region的brokerList
newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
// 构造情况
newRegionDO.setClusterId(null);
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.RESOURCE_ALREADY_USED);
}
@Test(dataProvider = "regionDO", description = "updateRegion, 更新的broker不存在")
public void updateRegion2BrokerNotExistTest1(RegionDO regionDO) {
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改brokerList后作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
// 构造情况
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(4);
newBrokerIdList.add(5);
newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.BROKER_NOT_EXIST);
}
@Test(dataProvider = "regionDO", description = "updateRegion, brokeList发生了改变成功测试")
public void updateRegion2SuccessWithBrokerListChangeTest1(RegionDO regionDO) {
// 先在数据库中创建一个Region
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 查询出创建的Region,并修改brokerList后作为新的Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO newRegionDO = regionDOList.get(0);
// 构造情况
List<Integer> newBrokerIdList = new ArrayList<>();
newBrokerIdList.add(1);
newBrokerIdList.add(3);
newRegionDO.setBrokerList(ListUtils.intList2String(newBrokerIdList));
Assert.assertEquals(regionService.updateRegion(newRegionDO), ResultStatus.SUCCESS);
}
@Test(dataProvider = "regionDO", description = "updateRegion重载方法参数非法测试")
public void updateRegion2ParamIllegalTest2(RegionDO regionDO) {
Assert.assertEquals(regionService.updateRegion(null, "1,3"), ResultStatus.PARAM_ILLEGAL);
Assert.assertEquals(regionService.updateRegion(1L, "1, 3"), ResultStatus.PARAM_ILLEGAL);
}
@Test(dataProvider = "regionDO", description = "updateRegion重载方法成功测试")
public void updateRegion2SuccessTest2(RegionDO regionDO) {
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO region = regionDOList.get(0);
Assert.assertEquals(regionService.updateRegion(region.getId(), "1,3"), ResultStatus.SUCCESS);
}
@Test(dataProvider = "regionDO")
public void updateCapacityByIdTest(RegionDO regionDO) {
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
RegionDO region = regionService.getByClusterId(1L).get(0);
region.setCapacity(1000L);
// 成功测试
updateCapacityById2SuccessTest(region);
// 失败测试
// 集群中不存在regionId是100的
region.setId(100L);
updateCapacityByIdFailureTest(region);
}
private void updateCapacityById2SuccessTest(RegionDO regionDO) {
Assert.assertEquals(regionService.updateCapacityById(regionDO), 1);
}
private void updateCapacityByIdFailureTest(RegionDO regionDO) {
Assert.assertEquals(regionService.updateCapacityById(regionDO), 0);
}
@Test(dataProvider = "regionDO")
public void getByIdTest(RegionDO regionDO) {
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
// 获取成功测试
RegionDO region = regionService.getByClusterId(1L).get(0);
getById2SuccessTest(region);
// 获取失败测试
region.setId(-1L);
getById2FailureTest(region);
}
private void getById2SuccessTest(RegionDO regionDO) {
Assert.assertEquals(regionService.getById(regionDO.getId()).toString(), regionDO.toString());
}
private void getById2FailureTest(RegionDO regionDO) {
Assert.assertNull(regionService.getById(regionDO.getId()));
}
@Test(dataProvider = "regionDO")
public void getByClusterIdTest(RegionDO regionDO) {
regionService.createRegion(regionDO);
// 获取成功测试
getByClusterId2SuccessTest(regionDO);
// 获取失败测试
getByClusterId2FailureTest(regionDO);
}
private void getByClusterId2SuccessTest(RegionDO regionDO) {
Assert.assertNotNull(regionService.getByClusterId(regionDO.getClusterId()));
Assert.assertTrue(regionService.getByClusterId(regionDO.getClusterId()).stream().allMatch(regionDO1 ->
regionDO1.getName().equals(regionDO.getName()) &&
regionDO1.getBrokerList().equals(regionDO.getBrokerList())));
}
private void getByClusterId2FailureTest(RegionDO regionDO) {
Assert.assertTrue(regionService.getByClusterId(-1L).isEmpty());
}
@Test(dataProvider = "regionDO")
public void listAllTest(RegionDO regionDO) {
Assert.assertTrue(regionService.listAll().isEmpty());
regionService.createRegion(regionDO);
Assert.assertNotNull(regionService.listAll());
Assert.assertTrue(regionService.listAll().stream().allMatch(regionDO1 ->
regionDO1.getName().equals(regionDO.getName()) &&
regionDO1.getBrokerList().equals(regionDO.getBrokerList())));
}
@Test(dataProvider = "regionDO")
public void getRegionNumTest(RegionDO regionDO) {
// 插入一条数据
regionService.createRegion(regionDO);
Map<Long, Integer> regionNum = regionService.getRegionNum();
for(Map.Entry<Long, Integer> entry : regionNum.entrySet()) {
Assert.assertEquals(entry.getKey(), Long.valueOf(1));
Assert.assertEquals(entry.getValue(), Integer.valueOf(1));
}
}
@Test(dataProvider = "regionDO")
public void getFullBrokerIdListTest(RegionDO regionDO) {
List<Integer> brokerIdList = new ArrayList<>();
brokerIdList.add(3);
// regionId是null测试
getFullBrokerIdList2RegionIdIsNullTest(regionDO, brokerIdList);
// 数据库中不存在对应的regionId数据
getFullBrokerIdList2RegionNotExistTest(regionDO, brokerIdList);
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
RegionDO region = regionService.getByClusterId(1L).get(0);
// 传进来的brokerList是空的
getFullBrokerIdList2BrokerIdListIsEmpty(regionDO, region, new ArrayList<>());
// 传进来的brokerList不是空的
getFullBrokerIdList2Success(regionDO, region, brokerIdList);
}
private void getFullBrokerIdList2RegionIdIsNullTest(RegionDO regionDO, List<Integer> brokerIdList) {
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(1L, null, brokerIdList);
Assert.assertEquals(fullBrokerIdList, brokerIdList);
}
private void getFullBrokerIdList2RegionNotExistTest(RegionDO regionDO, List<Integer> brokerIdList) {
Assert.assertEquals(regionService.getFullBrokerIdList(1L, -1L, brokerIdList), brokerIdList);
}
private void getFullBrokerIdList2BrokerIdListIsEmpty(RegionDO regionDO, RegionDO regionInDataBase, List<Integer> brokerIdList) {
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(1L, regionInDataBase.getId(), brokerIdList);
Assert.assertEquals(fullBrokerIdList, ListUtils.string2IntList(regionInDataBase.getBrokerList()));
}
private void getFullBrokerIdList2Success(RegionDO regionDO, RegionDO regionInDataBase, List<Integer> brokerIdList) {
List<Integer> fullBrokerIdList = regionService.getFullBrokerIdList(1L, regionInDataBase.getId(), brokerIdList);
List<Integer> allBrokerIdList = ListUtils.string2IntList(regionInDataBase.getBrokerList());
allBrokerIdList.addAll(brokerIdList);
Assert.assertEquals(allBrokerIdList, fullBrokerIdList);
}
@Test(dataProvider = "regionDO")
public void convert2BrokerIdRegionMapTest(RegionDO regionDO) {
Assert.assertEquals(regionService.createRegion(regionDO), ResultStatus.SUCCESS);
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
// regionDOList是null测试
convert2BrokerIdRegionMap2RegionListDOIsNull();
// 成功测试
convert2BrokerIdRegionMap2Success(regionDO);
}
private void convert2BrokerIdRegionMap2RegionListDOIsNull() {
Assert.assertTrue(regionService.convert2BrokerIdRegionMap(null).isEmpty());
}
private void convert2BrokerIdRegionMap2Success(RegionDO regionDO) {
// 预期结果, key是brokerId, value是Region
List<RegionDO> regionDOList = regionService.getByClusterId(1L);
RegionDO region = regionDOList.get(0);
Map<Integer, RegionDO> brokerIdRegionDOMap = ListUtils.string2IntList(regionDO.getBrokerList()).stream().collect(Collectors.toMap(brokerId -> brokerId, regionDO1 -> region));
// 实际结果
Map<Integer, RegionDO> result = regionService.convert2BrokerIdRegionMap(regionDOList);
Assert.assertEquals(brokerIdRegionDOMap, result);
}
@Test(dataProvider = "regionDO")
public void getIdleRegionBrokerListTest(RegionDO regionDO) {
// 物理集群id和regionIdList是null测试
getIdleRegionBrokerList2PhysicalClusterIdIsNullTest();
// 参数物理集群下的regionDOList为空测试
getIdleRegionBrokerList2RegionDOListIsEmptyTest();
// 成功测试
getIdleRegionBrokerList2SuccessTest(regionDO);
}
private void getIdleRegionBrokerList2PhysicalClusterIdIsNullTest() {
Assert.assertNull(regionService.getIdleRegionBrokerList(null, new ArrayList<>()));
}
private void getIdleRegionBrokerList2RegionDOListIsEmptyTest() {
List<Long> regionIdList = new ArrayList<>();
regionIdList.add(1L);
Assert.assertNull(regionService.getIdleRegionBrokerList(1L, regionIdList));
}
private void getIdleRegionBrokerList2SuccessTest(RegionDO regionDO) {
// 先插入
regionService.createRegion(regionDO);
// 从数据库中查找
List<Long> regionIdList = regionService.getByClusterId(1L).stream().map(RegionDO::getId).collect(Collectors.toList());
List<Integer> brokerIdList = regionService.getByClusterId(1L)
.stream().flatMap(regionDO1 -> ListUtils.string2IntList(regionDO1.getBrokerList()).stream())
.collect(Collectors.toList());
Assert.assertEquals(regionService.getIdleRegionBrokerList(1L, regionIdList), brokerIdList);
}
@Test
public void getTopicNameRegionBrokerIdMap2SuccessTest() {
// 创建逻辑集群创建Topic均已在数据库写入
// 逻辑集群基于物理集群1建立region的brokerList是12
// Topic基于region建立也就是使用到broker1和2
// 这个方法是返回topicName -> topic所使用broker以及这些broker所在region中所有的broker
Map<String, Set<Integer>> topicNameRegionBrokerIdMap = regionService.getTopicNameRegionBrokerIdMap(1L);
Map<String, Set<Integer>> expectedMap = new HashMap<>();
Set<Integer> set = new HashSet<>();
set.add(1);
set.add(2);
expectedMap.put("topic_a", set);
Assert.assertEquals(topicNameRegionBrokerIdMap, expectedMap);
}
@Test
public void getRegionListByTopicNameTest() {
// 数据库中依然建立了Region, LogicalCluster, Topic
getRegionListByTopicName2EmptyTest();
// 返回集合不为空测试
getRegionListByTopicName2Success();
}
private void getRegionListByTopicName2EmptyTest() {
// 传入一个不存在的topic
Assert.assertEquals(regionService.getRegionListByTopicName(1L, "notExistTopic"), new ArrayList<>());
}
private void getRegionListByTopicName2Success() {
List<RegionDO> expectedResult = regionService.getByClusterId(1L);
Assert.assertEquals(regionService.getRegionListByTopicName(1L, "topic_a"), expectedResult);
}
}

View File

@@ -0,0 +1,144 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicThrottledMetricsDO;
import com.xiaojukeji.kafka.manager.dao.TopicThrottledMetricsDao;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.*;
/**
* @author wyc
* @date 2021/12/24
*/
public class ThrottleServiceTest extends BaseTest {
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
private final static String REAL_TOPIC_IN_ZK = "topic_a";
private final static String ADMIN_NAME_IN_MYSQL = "admin";
private final static String KAFKA_MANAGER_APP_NAME = "KM管理员";
private final static String KAFKA_MANAGER_APP_ID = "dkm_admin";
private final static Set<Integer> REAL_BROKER_ID_SET = new HashSet<>();
private final static String REAL_REGION_IN_CLUSTER = "region1";
private final static String REAL_LOGICAL_CLUSTER_NAME = "logical_cluster_1";
// 共享集群
private final static Integer REAL_LOGICAL_CLUSTER_MODE = 0;
static {
REAL_BROKER_ID_SET.add(1);
REAL_BROKER_ID_SET.add(2);
}
@Autowired
@InjectMocks
private ThrottleService throttleService;
@Mock
private TopicThrottledMetricsDao topicThrottleDao;
@Mock
private JmxService jmxService;
@BeforeMethod
public void init() {
MockitoAnnotations.initMocks(this);
}
private TopicThrottledMetricsDO getTopicThrottledMetricsDO() {
TopicThrottledMetricsDO metricsDO = new TopicThrottledMetricsDO();
metricsDO.setAppId(KAFKA_MANAGER_APP_ID);
metricsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
metricsDO.setTopicName(REAL_TOPIC_IN_ZK);
return metricsDO;
}
private TopicThrottledMetrics getTopicThrottledMetrics() {
TopicThrottledMetrics metrics = new TopicThrottledMetrics();
metrics.setClientType(KafkaClientEnum.PRODUCE_CLIENT);
metrics.setTopicName(REAL_TOPIC_IN_ZK);
metrics.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
return metrics;
}
@Test
public void insertBatchTest() {
// dataList为空测试
Assert.assertEquals(throttleService.insertBatch(new ArrayList<>()), 0);
// 成功测试
insertBatch2SuccessTest();
}
private void insertBatch2SuccessTest() {
TopicThrottledMetricsDO metricsDO = getTopicThrottledMetricsDO();
List<TopicThrottledMetricsDO> doList = new ArrayList<>();
doList.add(metricsDO);
Mockito.when(topicThrottleDao.insertBatch(Mockito.anyList())).thenReturn(3);
Assert.assertTrue(throttleService.insertBatch(doList) > 0);
}
@Test
public void getTopicThrottleFromDBTest() {
// 返回空集合测试
getTopicThrottleFromDB2TopicThrottleDOListIsEmptyTest();
// 成功测试
getTopicThrottleFromDB2SuccessTest();
}
private void getTopicThrottleFromDB2SuccessTest() {
TopicThrottledMetricsDO metricsDO = getTopicThrottledMetricsDO();
List<TopicThrottledMetricsDO> metricsDOList = new ArrayList<>();
metricsDOList.add(metricsDO);
Mockito.when(topicThrottleDao.getTopicThrottle(Mockito.anyLong(), Mockito.anyString(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(metricsDOList);
Assert.assertTrue(throttleService.getTopicThrottleFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, KAFKA_MANAGER_APP_ID, new Date(), new Date()).stream().allMatch(metricsDO1 ->
metricsDO1.getAppId().equals(metricsDO.getAppId()) &&
metricsDO1.getClusterId().equals(metricsDO.getClusterId()) &&
metricsDO1.getTopicName().equals(metricsDO.getTopicName())));
}
private void getTopicThrottleFromDB2TopicThrottleDOListIsEmptyTest() {
Mockito.when(topicThrottleDao.getAppIdThrottle(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(new ArrayList<>());
Assert.assertTrue(throttleService.getTopicThrottleFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, KAFKA_MANAGER_APP_ID, new Date(), new Date()).isEmpty());
}
@Test
public void getThrottledTopicsFromJmxTest() {
// 返回空集合测试
getThrottledTopicsFromJmx2ReturnEmptyTest();
// 返回成功测试
getThrottledTopicsFromJmx2SuccessTest();
}
private void getThrottledTopicsFromJmx2ReturnEmptyTest() {
Assert.assertTrue(throttleService.getThrottledTopicsFromJmx(REAL_CLUSTER_ID_IN_MYSQL, REAL_BROKER_ID_SET, new ArrayList<>()).isEmpty());
}
private void getThrottledTopicsFromJmx2SuccessTest() {
List<KafkaClientEnum> clientList = new ArrayList<>();
clientList.add(KafkaClientEnum.PRODUCE_CLIENT);
Assert.assertTrue(throttleService.getThrottledTopicsFromJmx(REAL_CLUSTER_ID_IN_MYSQL, REAL_BROKER_ID_SET, clientList).isEmpty());
}
}

View File

@@ -0,0 +1,65 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicExpiredDO;
import com.xiaojukeji.kafka.manager.dao.TopicExpiredDao;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.List;
/**
* @author wyc
* @date 2021/12/20
*/
public class TopicExpiredServiceTest extends BaseTest {
@Autowired
private TopicExpiredDao topicExpiredDao;
@Autowired
private TopicExpiredService topicExpiredService;
private TopicExpiredDO getTopicExpiredDO() {
TopicExpiredDO topicExpiredDO = new TopicExpiredDO();
topicExpiredDO.setClusterId(1L);
topicExpiredDO.setExpiredDay(30);
topicExpiredDO.setTopicName("topic_a");
topicExpiredDO.setStatus(0);
return topicExpiredDO;
}
@Test
public void retainExpiredTopicTest() {
// 参数非法测试
Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, "topic_a", -1), ResultStatus.PARAM_ILLEGAL);
// Topic不存在测试
Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, "topicNotExist", 40), ResultStatus.TOPIC_NOT_EXIST);
// 成功测试
// 过期Topic插入到topic_expired表中时会先检查这个Topic是否在这个物理集群中所以测试基于集群中建立了"topic_a"的topic
topicExpiredDao.replace(getTopicExpiredDO());
Assert.assertEquals(topicExpiredService.retainExpiredTopic(1L, getTopicExpiredDO().getTopicName(), 40), ResultStatus.SUCCESS);
}
@Test
public void deleteByNameTest() {
// 删除失败
Assert.assertEquals(topicExpiredService.deleteByTopicName(1L, "notExistTopic"), 0);
// 删除成功
// 先在topic_expired表中插入数据,可以插入不存在的topic因为这个删除只是从数据库中删除删除的时候并没有检验topic是否存在于集群
// 根据返回值判断是否删除成功
TopicExpiredDO topicExpiredDO = getTopicExpiredDO();
topicExpiredDO.setTopicName("test-topic");
topicExpiredDao.replace(topicExpiredDO);
Assert.assertEquals(topicExpiredService.deleteByTopicName(getTopicExpiredDO().getClusterId(), "test-topic"), 1);
}
}

View File

@@ -0,0 +1,752 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
import com.xiaojukeji.kafka.manager.common.bizenum.TopicAuthorityEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.ao.RdTopicBasic;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.MineTopicSummary;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicAppData;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicBusinessInfo;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicDTO;
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicThrottledMetrics;
import com.xiaojukeji.kafka.manager.common.entity.pojo.*;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AuthorityDO;
import com.xiaojukeji.kafka.manager.common.utils.ListUtils;
import com.xiaojukeji.kafka.manager.dao.TopicDao;
import com.xiaojukeji.kafka.manager.dao.TopicExpiredDao;
import com.xiaojukeji.kafka.manager.dao.TopicStatisticsDao;
import com.xiaojukeji.kafka.manager.service.cache.LogicalClusterMetadataManager;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
import com.xiaojukeji.kafka.manager.service.service.gateway.AuthorityService;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.*;
/**
* @author wyc
* @date 2021/12/21
*/
public class TopicManagerServiceTest extends BaseTest {
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
private final static String REAL_TOPIC_IN_ZK = "topic_a";
private final static String ADMIN_NAME_IN_MYSQL = "admin";
private final static String KAFKA_MANAGER_APP_NAME = "KM管理员";
private final static String KAFKA_MANAGER_APP_ID = "dkm_admin";
private final static Set<Integer> REAL_BROKER_ID_SET = new HashSet<>();
private final static String REAL_REGION_IN_CLUSTER = "region1";
private final static String REAL_LOGICAL_CLUSTER_NAME = "logical_cluster_1";
// 共享集群
private final static Integer REAL_LOGICAL_CLUSTER_MODE = 0;
static {
REAL_BROKER_ID_SET.add(1);
REAL_BROKER_ID_SET.add(2);
}
@Autowired
@InjectMocks
private TopicManagerService topicManagerService;
@Mock
private TopicDao topicDao;
@Mock
private TopicStatisticsDao topicStatisticsDao;
@Mock
private TopicExpiredDao topicExpiredDao;
@Mock
private AppService appService;
@Mock
private ClusterService clusterService;
@Mock
private AuthorityService authorityService;
@Mock
private ThrottleService throttleService;
@Mock
private RegionService regionService;
@Mock
private LogicalClusterMetadataManager logicalClusterMetadataManager;
@BeforeMethod
public void init() {
MockitoAnnotations.initMocks(this);
}
private TopicDO getTopicDO() {
TopicDO topicDO = new TopicDO();
topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicDO.setTopicName("test_topic");
topicDO.setAppId("appId");
topicDO.setPeakBytesIn(100L);
return topicDO;
}
private TopicDO getTopicDOInCluster() {
// 这个Topic是通过工单手动在物理集群中插入的
TopicDO topicDO = new TopicDO();
topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicDO.setTopicName(REAL_TOPIC_IN_ZK);
topicDO.setAppId(KAFKA_MANAGER_APP_ID);
topicDO.setPeakBytesIn(100L);
return topicDO;
}
private TopicStatisticsDO getTopicStatisticsDO() {
// cluster_id, topic_name, offset_sum, max_avg_bytes_in, gmt_day
TopicStatisticsDO topicStatisticsDO = new TopicStatisticsDO();
topicStatisticsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicStatisticsDO.setTopicName(REAL_TOPIC_IN_ZK);
topicStatisticsDO.setOffsetSum(1L);
topicStatisticsDO.setMaxAvgBytesIn(1.0);
topicStatisticsDO.setGmtDay("2020-03-30");
return topicStatisticsDO;
}
private TopicExpiredDO getTopicExpiredDO() {
TopicExpiredDO topicExpiredDO = new TopicExpiredDO();
return topicExpiredDO;
}
private AuthorityDO getAuthorityDO() {
AuthorityDO authorityDO = new AuthorityDO();
authorityDO.setAccess(3);
authorityDO.setAppId(KAFKA_MANAGER_APP_ID);
authorityDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
authorityDO.setTopicName(REAL_TOPIC_IN_ZK);
return authorityDO;
}
private TopicThrottledMetrics getTopicThrottledMetrics() {
TopicThrottledMetrics metrics = new TopicThrottledMetrics();
metrics.setAppId(KAFKA_MANAGER_APP_ID);
metrics.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
metrics.setTopicName(REAL_TOPIC_IN_ZK);
metrics.setClientType(KafkaClientEnum.PRODUCE_CLIENT);
metrics.setBrokerIdSet(REAL_BROKER_ID_SET);
return metrics;
}
private TopicAppData getTopicAppData() {
TopicAppData data = new TopicAppData();
data.setAccess(3);
data.setAppId(KAFKA_MANAGER_APP_ID);
data.setAppName(KAFKA_MANAGER_APP_NAME);
data.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
data.setTopicName(REAL_TOPIC_IN_ZK);
data.setAppPrincipals(ADMIN_NAME_IN_MYSQL);
data.setConsumerQuota(15728640L);
data.setProduceQuota(15728640L);
return data;
}
private ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
return clusterDO;
}
private RegionDO getRegionDO() {
RegionDO regionDO = new RegionDO();
regionDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
regionDO.setBrokerList(ListUtils.intList2String(new ArrayList<>(REAL_BROKER_ID_SET)));
regionDO.setName(REAL_REGION_IN_CLUSTER);
return regionDO;
}
private RdTopicBasic getRdTopicBasic() {
RdTopicBasic rdTopicBasic = new RdTopicBasic();
rdTopicBasic.setAppId(KAFKA_MANAGER_APP_ID);
rdTopicBasic.setAppName(KAFKA_MANAGER_APP_NAME);
List<String> regionNameList = new ArrayList<>();
regionNameList.add(REAL_REGION_IN_CLUSTER);
rdTopicBasic.setRegionNameList(regionNameList);
return rdTopicBasic;
}
private TopicBusinessInfo getTopicBusinessInfo() {
TopicBusinessInfo info = new TopicBusinessInfo();
info.setAppId(KAFKA_MANAGER_APP_ID);
info.setAppName(KAFKA_MANAGER_APP_NAME);
info.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
info.setPrincipals(ADMIN_NAME_IN_MYSQL);
info.setTopicName(REAL_TOPIC_IN_ZK);
return info;
}
private LogicalClusterDO getLogicalClusterDO() {
LogicalClusterDO logicalClusterDO = new LogicalClusterDO();
logicalClusterDO.setName(REAL_LOGICAL_CLUSTER_NAME);
logicalClusterDO.setMode(REAL_LOGICAL_CLUSTER_MODE);
logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
logicalClusterDO.setAppId(KAFKA_MANAGER_APP_ID);
return logicalClusterDO;
}
@Test
public void addTopicTest() {
// addTopic只是向数据库topic表中写数据
// 成功测试
Mockito.when(topicDao.insert(Mockito.any())).thenReturn(1);
Assert.assertEquals(topicManagerService.addTopic(getTopicDO()), 1);
// 失败测试,再次插入相同的Topic
Mockito.when(topicDao.insert(Mockito.any())).thenReturn(0);
Assert.assertEquals(topicManagerService.addTopic(getTopicDO()), 0);
}
@Test
public void deleteByTopicNameTest() {
// 删除也只是删除topic表中的数据
// 删除失败测试,删除一个不存在的
Mockito.when(topicDao.deleteByName(Mockito.anyLong(), Mockito.anyString())).thenReturn(0);
Assert.assertEquals(topicManagerService.deleteByTopicName(getTopicDO().getClusterId(), "notExistTopic"), 0);
// 删除成功测试
Mockito.when(topicDao.deleteByName(Mockito.anyLong(), Mockito.anyString())).thenReturn(1);
Assert.assertEquals(topicManagerService.deleteByTopicName(getTopicDO().getClusterId(), getTopicDO().getTopicName()), 1);
}
@Test(description = "物理集群中不存在该topic时的删除操作")
public void modifyTopic2TopicNotExist() {
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(null);
Assert.assertEquals(topicManagerService.modifyTopic(getTopicDO().getClusterId(), "notExistTopic", null, null), ResultStatus.TOPIC_NOT_EXIST);
}
@Test(description = "modifyTopic, 成功")
public void modifyTopic2Success() {
TopicDO topicDO = getTopicDOInCluster();
// 因为会检查集群中是否存在这个Topic因此直接用KM创建topic用这个Topic测试
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO);
Mockito.when(topicDao.updateByName(Mockito.any())).thenReturn(1);
Assert.assertEquals(topicManagerService.modifyTopic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, "更改过", "operator"), ResultStatus.SUCCESS);
}
@Test(description = "modifyTopicByOp, 物理集群中不存在该topic")
public void modifyTopicByOp2TopicNotExistTest() {
Assert.assertEquals(topicManagerService.modifyTopicByOp(getTopicDOInCluster().getClusterId(), "notExistTopic", "dkm_admin", null, "admin"), ResultStatus.TOPIC_NOT_EXIST);
}
@Test(description = "modifyTopicByOp, 物理集群中不存在传入的appId")
public void modifyTopicByOp2AppNotExistTest() {
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(null);
Assert.assertEquals(topicManagerService.modifyTopicByOp(getTopicDOInCluster().getClusterId(), getTopicDOInCluster().getTopicName(), "notExistAppId", null, "admin"), ResultStatus.APP_NOT_EXIST);
}
@Test(description = "modifyTopicByOp, 成功测试")
public void modifyTopicByOp2SuccessTest() {
AppDO appDO = getAppDO();
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO);
Assert.assertEquals(topicManagerService.modifyTopicByOp(getTopicDOInCluster().getClusterId(), getTopicDOInCluster().getTopicName(), getTopicDOInCluster().getAppId(), "", "admin"), ResultStatus.SUCCESS);
}
private void addAuthority2ClusterNotExistTest() {
AuthorityDO authorityDO = getAuthorityDO();
AppDO appDO = getAppDO();
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO);
Mockito.when(logicalClusterMetadataManager.getPhysicalClusterId(Mockito.anyLong())).thenReturn(null);
Assert.assertEquals(topicManagerService.addAuthority(authorityDO), ResultStatus.CLUSTER_NOT_EXIST);
}
@Test
public void listAllTest() {
// 准备mock返回数据
List<TopicDO> doList = new ArrayList<>();
doList.add(getTopicDO());
topicDao.insert(getTopicDO());
Mockito.when(topicDao.listAll()).thenReturn(doList);
List<TopicDO> topicDOS = topicManagerService.listAll();
Assert.assertFalse(topicDOS.isEmpty());
Assert.assertTrue(topicDOS.stream().allMatch(topicDO ->
topicDO.getClusterId().equals(getTopicDO().getClusterId()) &&
topicDO.getAppId().equals(getTopicDO().getAppId()) &&
topicDO.getTopicName().equals(getTopicDO().getTopicName())));
}
@Test
public void getByClusterIdFromCacheTest() {
// 返回空集合测试
getByClusterIdFromCache2ReturnEmptyListTest();
// 返回成功测试
getByClusterIdFromCache2SuccessTest();
}
private void getByClusterIdFromCache2ReturnEmptyListTest() {
Assert.assertTrue(topicManagerService.getByClusterIdFromCache(null).isEmpty());
}
private void getByClusterIdFromCache2SuccessTest() {
List<TopicDO> doList = new ArrayList<>();
doList.add(getTopicDO());
Mockito.when(topicDao.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(doList);
Assert.assertEquals(topicManagerService.getByClusterIdFromCache(REAL_CLUSTER_ID_IN_MYSQL), doList);
}
@Test
public void getByClusterIdTest() {
// 返回空集合测试
getByClusterId2ReturnEmptyListTest();
// 返回成功测试
getByClusterId2SuccessTest();
}
private void getByClusterId2ReturnEmptyListTest() {
Assert.assertTrue(topicManagerService.getByClusterId(null).isEmpty());
}
private void getByClusterId2SuccessTest() {
List<TopicDO> doList = new ArrayList<>();
doList.add(getTopicDO());
Mockito.when(topicDao.getByClusterId(Mockito.anyLong())).thenReturn(doList);
Assert.assertEquals(topicManagerService.getByClusterId(REAL_CLUSTER_ID_IN_MYSQL), doList);
}
@Test
public void getByTopicNameTest() {
// 返回null测试
getByTopicName2ReturnNullTest();
// 成功测试
getByTopicName2SuccessTest();
}
private void getByTopicName2ReturnNullTest() {
Assert.assertNull(topicManagerService.getByTopicName(null, REAL_TOPIC_IN_ZK));
Assert.assertNull(topicManagerService.getByTopicName(REAL_CLUSTER_ID_IN_MYSQL, null));
}
private void getByTopicName2SuccessTest() {
TopicDO topicDOInCluster = getTopicDOInCluster();
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDOInCluster);
Assert.assertEquals(topicManagerService.getByTopicName(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK), topicDOInCluster);
}
@Test
public void replaceTopicStatisticsTest() {
// 失败测试
Mockito.when(topicStatisticsDao.replace(Mockito.any())).thenReturn(0);
Assert.assertEquals(topicManagerService.replaceTopicStatistics(new TopicStatisticsDO()), 0);
// 成功测试
Mockito.when(topicStatisticsDao.replace(Mockito.any())).thenReturn(1);
Assert.assertEquals(topicManagerService.replaceTopicStatistics(new TopicStatisticsDO()), 1);
}
@Test
public void getTopicMaxAvgBytesInTest() {
// 返回空Map测试
getTopicMaxAvgBytesIn2ReturnEmptyMapTest();
// 返回成功测试
getTopicMaxAvgBytesIn2SuccessTest();
}
private void getTopicMaxAvgBytesIn2ReturnEmptyMapTest() {
List<TopicStatisticsDO> doList = new ArrayList<>();
Mockito.when(topicStatisticsDao.getTopicStatisticData(Mockito.anyLong(), Mockito.any(), Mockito.anyDouble())).thenReturn(doList);
Assert.assertTrue(topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL,1, 1.0).size() == 0);
}
private void getTopicMaxAvgBytesIn2SuccessTest() {
List<TopicStatisticsDO> doList = new ArrayList<>();
TopicStatisticsDO topicStatisticsDO = getTopicStatisticsDO();
doList.add(topicStatisticsDO);
Mockito.when(topicStatisticsDao.getTopicStatisticData(Mockito.anyLong(), Mockito.any(), Mockito.anyDouble())).thenReturn(doList);
Map<String, List<Double>> expectMap = new HashMap<>();
List<Double> list = new ArrayList<>();
list.add(topicStatisticsDO.getMaxAvgBytesIn());
expectMap.put(topicStatisticsDO.getTopicName(), list);
Map<String, List<Double>> actualMap = topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL, 1, 1.0);
Assert.assertTrue(actualMap.keySet().stream().allMatch(key -> actualMap.get(key).equals(expectMap.get(key))));
}
@Test
public void getTopicMaxAvgBytesInTest2() {
// 返回空测试
getTopicMaxAvgBytesInTest2NullTest();
// 成功测试
getTopicMaxAvgBytesInTest2SuccessTest();
}
private void getTopicMaxAvgBytesInTest2NullTest() {
Mockito.when(topicStatisticsDao.getTopicMaxAvgBytesIn(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(null);
Assert.assertEquals(topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, new Date(), new Date(), 1), null);
}
private void getTopicMaxAvgBytesInTest2SuccessTest() {
Mockito.when(topicStatisticsDao.getTopicMaxAvgBytesIn(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any(), Mockito.anyInt())).thenReturn(1.0);
Assert.assertEquals(topicManagerService.getTopicMaxAvgBytesIn(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, new Date(), new Date(), 1), 1.0);
}
@Test
public void getByTopicAndDayTest() {
TopicStatisticsDO topicStatisticsDO = getTopicStatisticsDO();
Mockito.when(topicStatisticsDao.getByTopicAndDay(Mockito.anyLong(), Mockito.anyString(), Mockito.anyString())).thenReturn(topicStatisticsDO);
Assert.assertEquals(topicManagerService.getByTopicAndDay(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, "2020-03-30"), topicStatisticsDO);
}
@Test
public void getExpiredTopicsTest() {
Mockito.when(topicExpiredDao.getExpiredTopics(Mockito.anyInt())).thenReturn(new ArrayList<>());
Assert.assertTrue(topicManagerService.getExpiredTopics(30).isEmpty());
}
private MineTopicSummary getMineTopicSummary() {
MineTopicSummary summary = new MineTopicSummary();
summary.setAppName(KAFKA_MANAGER_APP_NAME);
summary.setAccess(TopicAuthorityEnum.OWNER.getCode());
summary.setPhysicalClusterId(1L);
summary.setTopicName(REAL_TOPIC_IN_ZK);
return summary;
}
private TopicDO getRealTopicDO() {
TopicDO topicDO = new TopicDO();
topicDO.setAppId(KAFKA_MANAGER_APP_ID);
topicDO.setTopicName(REAL_TOPIC_IN_ZK);
topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicDO.setDescription("topic介绍");
topicDO.setAppId(KAFKA_MANAGER_APP_ID);
return topicDO;
}
private AppDO getAppDO() {
AppDO appDO = new AppDO();
appDO.setAppId(KAFKA_MANAGER_APP_ID);
appDO.setName(KAFKA_MANAGER_APP_NAME);
appDO.setPrincipals(ADMIN_NAME_IN_MYSQL);
appDO.setType(1);
return appDO;
}
@Test
public void getMyTopicsTest() {
MineTopicSummary mineTopicSummary = getMineTopicSummary();
List<TopicDO> topicDOList = new ArrayList<>();
TopicDO realTopicDO = getRealTopicDO();
topicDOList.add(realTopicDO);
List<AppDO> appDOList = new ArrayList<>();
AppDO appDO = getAppDO();
appDOList.add(appDO);
Mockito.when(topicDao.listAll()).thenReturn(topicDOList);
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(realTopicDO);
Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(appDOList);
Assert.assertTrue(topicManagerService.getMyTopics(ADMIN_NAME_IN_MYSQL).stream().allMatch(summary ->
summary.getAccess().equals(mineTopicSummary.getAccess()) &&
summary.getAppName().equals(mineTopicSummary.getAppName()) &&
summary.getTopicName().equals(mineTopicSummary.getTopicName())));
}
@Test
public void getTopicsTest() {
// ClusterDO为空测试
getTopic2clusterDOListIsEmptyTest();
// appList为空测试
getTopic2AppListIsEmptyTest();
// 成功测试
getTopic2SuccessTest();
}
private TopicDTO getTopicDTO() {
TopicDTO topicDTO = new TopicDTO();
topicDTO.setAppId(KAFKA_MANAGER_APP_ID);
topicDTO.setAppName(KAFKA_MANAGER_APP_NAME);
topicDTO.setAppPrincipals(ADMIN_NAME_IN_MYSQL);
topicDTO.setTopicName(REAL_TOPIC_IN_ZK);
return topicDTO;
}
private void getTopic2clusterDOListIsEmptyTest() {
Mockito.when(clusterService.listAll()).thenReturn(new ArrayList<>());
Assert.assertTrue(topicManagerService.getTopics(ADMIN_NAME_IN_MYSQL).isEmpty());
}
private void getTopic2AppListIsEmptyTest() {
List<ClusterDO> clusterDOList = new ArrayList<>();
clusterDOList.add(new ClusterDO());
Mockito.when(clusterService.listAll()).thenReturn(clusterDOList);
Mockito.when(appService.listAll()).thenReturn(new ArrayList<>());
Assert.assertTrue(topicManagerService.getTopics(ADMIN_NAME_IN_MYSQL).isEmpty());
}
private void getTopic2SuccessTest() {
List<ClusterDO> clusterDOList = new ArrayList<>();
ClusterDO clusterDO = getClusterDO();
clusterDOList.add(clusterDO);
List<AppDO> appDOList = new ArrayList<>();
AppDO appDO = getAppDO();
appDOList.add(appDO);
TopicDTO topicDTO = getTopicDTO();
LogicalClusterDO logicalClusterDO = getLogicalClusterDO();
Mockito.when(clusterService.listAll()).thenReturn(clusterDOList);
Mockito.when(appService.listAll()).thenReturn(appDOList);
Mockito.when(logicalClusterMetadataManager.getTopicLogicalCluster(Mockito.anyLong(), Mockito.anyString())).thenReturn(logicalClusterDO);
Assert.assertTrue(topicManagerService.getTopics(ADMIN_NAME_IN_MYSQL).stream().allMatch(dto ->
dto.getAppId().equals(topicDTO.getAppId()) &&
dto.getAppName().equals(topicDTO.getAppName()) &&
dto.getTopicName().equals(topicDTO.getTopicName())));
}
@Test
public void getTopicAuthorizedAppsTest() {
// Topic不存在测试
getTopicAuthorizedApps2TopicNotExistTest();
// 没有权限测试
getTopicAuthorizedApps2NoAuthorityTest();
// 成功测试
getTopicAuthorizedApps2successTest();
}
private void getTopicAuthorizedApps2TopicNotExistTest() {
Assert.assertTrue(topicManagerService.getTopicAuthorizedApps(-1L, "notExistTopic").isEmpty());
}
private void getTopicAuthorizedApps2NoAuthorityTest() {
Mockito.when(authorityService.getAuthorityByTopic(Mockito.anyLong(), Mockito.anyString())).thenReturn(new ArrayList<>());
Assert.assertTrue(topicManagerService.getTopicAuthorizedApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).isEmpty());
}
private void getTopicAuthorizedApps2successTest() {
AuthorityDO authorityDO = getAuthorityDO();
List<AuthorityDO> authorityDOList = new ArrayList<>();
authorityDOList.add(authorityDO);
Mockito.when(authorityService.getAuthorityByTopic(Mockito.anyLong(), Mockito.anyString())).thenReturn(authorityDOList);
List<TopicThrottledMetrics> topicThrottledMetricsList = new ArrayList<>();
TopicThrottledMetrics topicThrottledMetrics = getTopicThrottledMetrics();
topicThrottledMetricsList.add(topicThrottledMetrics);
Mockito.when(throttleService.getThrottledTopicsFromJmx(Mockito.anyLong(), Mockito.anySet(), Mockito.anyList())).thenReturn(topicThrottledMetricsList);
AppDO appDO = getAppDO();
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO);
TopicAppData topicAppData = getTopicAppData();
Assert.assertTrue(topicManagerService.getTopicAuthorizedApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).stream().allMatch(data ->
data.getAccess().equals(topicAppData.getAccess()) &&
data.getAppId().equals(topicAppData.getAppId()) &&
data.getTopicName().equals(topicAppData.getTopicName()) &&
data.getAppName().equals(topicAppData.getAppName()) &&
data.getAppPrincipals().equals(topicAppData.getAppPrincipals())));
}
@Test
public void getTopicMineAppsTest() {
// topic不存在测试
getTopicMineApps2TopicNotExistTest();
// appDOList为空测试
getTopicMineApps2AppDOListIsEmptyTest();
// 成功测试
getTopicMineApps2Success();
}
private void getTopicMineApps2TopicNotExistTest() {
Assert.assertTrue(topicManagerService.getTopicMineApps(-1L, "notExistTopic", ADMIN_NAME_IN_MYSQL).isEmpty());
}
private void getTopicMineApps2AppDOListIsEmptyTest() {
Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(new ArrayList<>());
Assert.assertTrue(topicManagerService.getTopicMineApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, ADMIN_NAME_IN_MYSQL).isEmpty());
}
private void getTopicMineApps2Success() {
AppDO appDO = getAppDO();
List<AppDO> appDOList = new ArrayList<>();
appDOList.add(appDO);
Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(appDOList);
AuthorityDO authorityDO = getAuthorityDO();
List<AuthorityDO> authorityDOList = new ArrayList<>();
authorityDOList.add(authorityDO);
Mockito.when(authorityService.getAuthorityByTopic(Mockito.anyLong(), Mockito.anyString())).thenReturn(authorityDOList);
List<TopicThrottledMetrics> topicThrottledMetricsList = new ArrayList<>();
TopicThrottledMetrics topicThrottledMetrics = getTopicThrottledMetrics();
topicThrottledMetricsList.add(topicThrottledMetrics);
Mockito.when(throttleService.getThrottledTopicsFromJmx(Mockito.anyLong(), Mockito.anySet(), Mockito.anyList())).thenReturn(topicThrottledMetricsList);
System.out.println(topicManagerService.getTopicMineApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, ADMIN_NAME_IN_MYSQL));
TopicAppData topicAppData = getTopicAppData();
Assert.assertTrue(topicManagerService.getTopicMineApps(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, ADMIN_NAME_IN_MYSQL).stream().allMatch(data ->
data.getAppName().equals(topicAppData.getAppName()) &&
data.getTopicName().equals(topicAppData.getTopicName()) &&
data.getConsumerQuota().equals(topicAppData.getConsumerQuota())));
}
@Test
public void getRdTopicBasicTest() {
// 集群不存在测试
getRdTopicBasic2ClusterNotExistTest();
// 成功测试
getRdTopicBasic2SuccessTest();
}
private void getRdTopicBasic2ClusterNotExistTest() {
Mockito.when(clusterService.getById(Mockito.anyLong())).thenReturn(null);
Assert.assertEquals(topicManagerService.getRdTopicBasic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).toString(), Result.buildFrom(ResultStatus.CLUSTER_NOT_EXIST).toString());
}
private void getRdTopicBasic2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
Mockito.when(clusterService.getById(REAL_CLUSTER_ID_IN_MYSQL)).thenReturn(clusterDO);
RegionDO regionDO = getRegionDO();
List<RegionDO> regionDOList = new ArrayList<>();
regionDOList.add(regionDO);
Mockito.when(regionService.getRegionListByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(regionDOList);
AppDO appDO = getAppDO();
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO);
TopicDO topicDO = getTopicDOInCluster();
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO);
RdTopicBasic actualResult = topicManagerService.getRdTopicBasic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).getData();
RdTopicBasic expectedResult = getRdTopicBasic();
Assert.assertNotNull(actualResult);
Assert.assertTrue(actualResult.getAppId().equals(expectedResult.getAppId()) &&
actualResult.getAppName().equals(expectedResult.getAppName()) &&
actualResult.getRegionNameList().equals(expectedResult.getRegionNameList()));
}
@Test
public void getTopicBusinessInfoTest() {
// TopicDO为null测试
getRdTopicBasic2SuccessTest();
// AppDO为null测试
getTopicBusinessInfo2AppDOIsNullTest();
// 成功测试
getTopicBusinessInfo2SuccessTest();
}
private void getTopicBusinessInfo2ReturnNullTest() {
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(null);
Assert.assertNull(topicManagerService.getTopicBusinessInfo(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK));
}
private void getTopicBusinessInfo2AppDOIsNullTest() {
TopicDO topicDO = getTopicDOInCluster();
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO);
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(null);
TopicBusinessInfo expected = getTopicBusinessInfo();
expected.setAppId(null);
expected.setAppName(null);
expected.setPrincipals(null);
Assert.assertEquals(topicManagerService.getTopicBusinessInfo(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).toString(), expected.toString());
}
private void getTopicBusinessInfo2SuccessTest() {
TopicDO topicDO = getTopicDOInCluster();
Mockito.when(topicDao.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO);
AppDO appDO = getAppDO();
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(appDO);
TopicBusinessInfo expected = getTopicBusinessInfo();
Assert.assertEquals(topicManagerService.getTopicBusinessInfo(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK).toString(), expected.toString());
}
@Test
public void getTopicStatisticTest() {
TopicStatisticsDO topicStatisticsDO = getTopicStatisticsDO();
List<TopicStatisticsDO> topicStatisticsDOList = new ArrayList<>();
topicStatisticsDOList.add(topicStatisticsDO);
Mockito.when(topicStatisticsDao.getTopicStatistic(Mockito.anyLong(), Mockito.anyString(), Mockito.any(), Mockito.any())).thenReturn(topicStatisticsDOList);
Assert.assertTrue(topicManagerService.getTopicStatistic(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC_IN_ZK, new Date(), new Date()).stream().allMatch(statisticsDO ->
statisticsDO.getClusterId().equals(topicStatisticsDO.getClusterId()) &&
statisticsDO.getMaxAvgBytesIn().equals(topicStatisticsDO.getMaxAvgBytesIn()) &&
statisticsDO.getOffsetSum().equals(topicStatisticsDO.getOffsetSum())));
}
@Test
public void addAuthorityTest() {
// app不存在测试
addAuthority2AppNotExistTest();
// cluster不存在测试
// addAuthority2ClusterNotExistTest();
}
private void addAuthority2AppNotExistTest() {
AuthorityDO authorityDO = getAuthorityDO();
// Mockito.when(appService.getByPrincipal(Mockito.anyString())).thenReturn(new ArrayList<>());
Assert.assertEquals(topicManagerService.addAuthority(authorityDO), ResultStatus.APP_NOT_EXIST);
}
}