From 37aa526404a6d1160e36f11ecdeb2125a50e2c2f Mon Sep 17 00:00:00 2001 From: xuguang Date: Thu, 6 Jan 2022 20:00:25 +0800 Subject: [PATCH] =?UTF-8?q?=E5=8D=95=E5=85=83=E6=B5=8B=E8=AF=95=EF=BC=9ACl?= =?UTF-8?q?usterHostTaskServiceTest=EF=BC=8CClusterRoleTaskServiceTest,Clu?= =?UTF-8?q?sterTaskServiceTest,KafkaFileServiceTest,TopicCommandsTest,Topi?= =?UTF-8?q?cReassignUtilsTest?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/service/AdminServiceTest.java | 375 ++++++++++++++++++ .../service/service/ExpertServiceTest.java | 80 ++++ .../service/gateway/AppServiceTest.java | 23 +- .../gateway/TopicConnectionServiceTest.java | 85 ++++ .../service/utils/TopicCommandsTest.java | 234 +++++++++++ .../service/utils/TopicReassignUtilsTest.java | 78 ++++ .../kcm/ClusterHostTaskServiceTest.java | 80 ++++ .../kcm/ClusterRoleTaskServiceTest.java | 74 ++++ .../manager/kcm/ClusterTaskServiceTest.java | 348 +++++++++++++++- .../manager/kcm/KafkaFileServiceTest.java | 224 +++++++++++ 10 files changed, 1589 insertions(+), 12 deletions(-) create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AdminServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ExpertServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionServiceTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommandsTest.java create mode 100644 kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicReassignUtilsTest.java create mode 100644 kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterHostTaskServiceTest.java create mode 100644 kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterRoleTaskServiceTest.java create mode 100644 kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileServiceTest.java diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AdminServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AdminServiceTest.java new file mode 100644 index 00000000..a21a8f8d --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AdminServiceTest.java @@ -0,0 +1,375 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Date; +import java.util.Properties; + +/** + * @author xuguang + * @Date 2021/12/24 + */ +public class AdminServiceTest extends BaseTest { + + /** + * 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上 + */ + private final static String REAL_TOPIC1_IN_ZK = "moduleTest"; + + /** + * 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上 + */ + private final static String REAL_TOPIC2_IN_ZK = "xgTest"; + + private final static String INVALID_TOPIC = "xxxxx"; + + private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets"; + + private final static String CREATE_TOPIC_TEST = "createTopicTest"; + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static Integer REAL_BROKER_ID_IN_ZK = 1; + + private final static Long INVALID_CLUSTER_ID = -1L; + + private final static Integer INVALID_PARTITION_ID = -1; + + private final static Integer REAL_PARTITION_ID = 0; + + private final static Integer INVALID_BROKER_ID = -1; + + private final static String APP_ID = "dkm_admin"; + + private final static Long INVALID_REGION_ID = -1L; + + private final static Long REAL_REGION_ID_IN_MYSQL = 1L; + + private final static String ADMIN = "admin"; + + + @Autowired + private AdminService adminService; + + @Autowired + private TopicManagerService topicManagerService; + + private TopicDO getTopicDO() { + TopicDO topicDO = new TopicDO(); + topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicDO.setTopicName(CREATE_TOPIC_TEST); + topicDO.setAppId(APP_ID); + topicDO.setDescription(CREATE_TOPIC_TEST); + topicDO.setPeakBytesIn(100000L); + return topicDO; + } + + public ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + @Test(description = "测试创建topic") + public void createTopicTest() { + // broker not exist + createTopic2BrokerNotExistTest(); + // success to create topic + createTopic2SuccessTest(); + // failure to create topic, topic already exists + createTopic2FailureTest(); + } + + private void createTopic2BrokerNotExistTest() { + TopicDO topicDO = getTopicDO(); + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = adminService.createTopic( + clusterDO, + topicDO, + 1, + 1, + 1L, + Arrays.asList(INVALID_BROKER_ID), + new Properties(), + ADMIN, + ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode()); + } + + private void createTopic2FailureTest() { + TopicDO topicDO = getTopicDO(); + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = adminService.createTopic( + clusterDO, + topicDO, + 1, + 1, + INVALID_REGION_ID, + Arrays.asList(REAL_BROKER_ID_IN_ZK), + new Properties(), + ADMIN, + ADMIN); + Assert.assertNotEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + private void createTopic2SuccessTest() { + TopicDO topicDO = getTopicDO(); + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = adminService.createTopic( + clusterDO, + topicDO, + 1, + 1, + INVALID_REGION_ID, + Arrays.asList(REAL_BROKER_ID_IN_ZK), + new Properties(), + ADMIN, + ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试删除topic") + public void deleteTopicTest() { + // topic does not exist + deleteTopic2FailureTest(); + // success to delete + deleteTopic2SuccessTest(); + } + + private void deleteTopic2FailureTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.deleteTopic( + clusterDO, + INVALID_TOPIC, + ADMIN + ); + Assert.assertNotEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + private void deleteTopic2SuccessTest() { + TopicDO topicDO = getTopicDO(); + topicManagerService.addTopic(topicDO); + + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.deleteTopic( + clusterDO, + CREATE_TOPIC_TEST, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试优先副本选举状态") + public void preferredReplicaElectionStatusTest() { + // running + preferredReplicaElectionStatus2RunningTest(); + // not running + preferredReplicaElectionStatus2NotRunningTest(); + } + + private void preferredReplicaElectionStatus2RunningTest() { + // zk上需要创建/admin/preferred_replica_election节点 + ClusterDO clusterDO = getClusterDO(); + TaskStatusEnum taskStatusEnum = adminService.preferredReplicaElectionStatus(clusterDO); + Assert.assertEquals(taskStatusEnum.getCode(), TaskStatusEnum.RUNNING.getCode()); + } + + private void preferredReplicaElectionStatus2NotRunningTest() { + ClusterDO clusterDO = getClusterDO(); + // zk上无/admin/preferred_replica_election节点 + TaskStatusEnum taskStatusEnum = adminService.preferredReplicaElectionStatus(clusterDO); + Assert.assertEquals(taskStatusEnum.getCode(), TaskStatusEnum.SUCCEED.getCode()); + } + + @Test(description = "测试集群纬度优先副本选举") + public void preferredReplicaElectionOfCluster2Test() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection(clusterDO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "Broker纬度优先副本选举") + public void preferredReplicaElectionOfBrokerTest() { + // 参数异常 + preferredReplicaElectionOfBroker2ParamIllegalTest(); + // success + preferredReplicaElectionOfBroker2SuccessTest(); + } + + private void preferredReplicaElectionOfBroker2ParamIllegalTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + INVALID_BROKER_ID, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void preferredReplicaElectionOfBroker2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + REAL_BROKER_ID_IN_ZK, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "Topic纬度优先副本选举") + public void preferredReplicaElectionOfTopicTest() { + // topic not exist + preferredReplicaElectionOfTopic2TopicNotExistTest(); + // success + preferredReplicaElectionOfTopic2SuccessTest(); + } + + private void preferredReplicaElectionOfTopic2TopicNotExistTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + INVALID_TOPIC, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode()); + } + + private void preferredReplicaElectionOfTopic2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + REAL_TOPIC1_IN_ZK, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "Topic纬度优先副本选举") + public void preferredReplicaElectionOfPartitionTest() { + // topic not exist + preferredReplicaElectionOfPartition2TopicNotExistTest(); + // partition Not Exist + preferredReplicaElectionOfPartition2PartitionNotExistTest(); + // success + preferredReplicaElectionOfPartition2SuccessTest(); + } + + private void preferredReplicaElectionOfPartition2TopicNotExistTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + INVALID_TOPIC, + INVALID_PARTITION_ID, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode()); + } + + private void preferredReplicaElectionOfPartition2PartitionNotExistTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + REAL_TOPIC2_IN_ZK, + INVALID_PARTITION_ID, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARTITION_NOT_EXIST.getCode()); + } + + private void preferredReplicaElectionOfPartition2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.preferredReplicaElection( + clusterDO, + REAL_TOPIC2_IN_ZK, + REAL_PARTITION_ID, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试获取Topic配置") + public void getTopicConfigTest() { + // result is null + getTopicConfig2NullTest(); + // result not null + getTopicConfig2NotNullTest(); + } + + private void getTopicConfig2NullTest() { + ClusterDO clusterDO = getClusterDO(); + clusterDO.setId(INVALID_CLUSTER_ID); + Properties topicConfig = adminService.getTopicConfig(clusterDO, REAL_TOPIC1_IN_ZK); + Assert.assertNull(topicConfig); + } + + private void getTopicConfig2NotNullTest() { + ClusterDO clusterDO = getClusterDO(); + Properties topicConfig = adminService.getTopicConfig(clusterDO, REAL_TOPIC1_IN_ZK); + Assert.assertNotNull(topicConfig); + } + + @Test(description = "测试修改Topic配置") + public void modifyTopicConfigTest() { + ClusterDO clusterDO = getClusterDO(); + Properties properties = new Properties(); + properties.put("retention.ms", "21600000"); + ResultStatus resultStatus = adminService.modifyTopicConfig( + clusterDO, + REAL_TOPIC1_IN_ZK, + properties, + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试扩分区") + public void expandPartitionsTest() { + // broker not exist + expandPartitions2BrokerNotExistTest(); + // success + expandPartitions2SuccessTest(); + } + + private void expandPartitions2BrokerNotExistTest() { + // 存在两个下线broker, region中包含一个 + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.expandPartitions( + clusterDO, + REAL_TOPIC1_IN_ZK, + 2, + REAL_REGION_ID_IN_MYSQL, + Arrays.asList(INVALID_BROKER_ID), + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode()); + } + + private void expandPartitions2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus resultStatus = adminService.expandPartitions( + clusterDO, + REAL_TOPIC1_IN_ZK, + 2, + INVALID_REGION_ID, + Arrays.asList(REAL_BROKER_ID_IN_ZK), + ADMIN + ); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode()); + } + +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ExpertServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ExpertServiceTest.java new file mode 100644 index 00000000..3c447e07 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ExpertServiceTest.java @@ -0,0 +1,80 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.entity.ao.config.expert.RegionTopicHotConfig; +import com.xiaojukeji.kafka.manager.common.entity.ao.expert.TopicRegionHot; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +/** + * @author xuguang + * Collections.EmptyList() + * @Date 2021/12/24 + */ +public class ExpertServiceTest extends BaseTest { + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static Long INVALID_CLUSTER_ID = -1L; + + @Autowired + @InjectMocks + private ExpertService expertService; + + @Mock + private ConfigService configService; + + @Mock + private ClusterService clusterService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + private RegionTopicHotConfig getRegionTopicHotConfig() { + RegionTopicHotConfig regionTopicHotConfig = new RegionTopicHotConfig(); + regionTopicHotConfig.setMaxDisPartitionNum(5); + regionTopicHotConfig.setIgnoreClusterIdList(Arrays.asList(INVALID_CLUSTER_ID)); + return regionTopicHotConfig; + } + + public ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + @Test(description = "测试Region内热点Topic") + public void getRegionHotTopics() { + RegionTopicHotConfig regionTopicHotConfig = getRegionTopicHotConfig(); + Mockito.when(configService.getByKey(Mockito.anyString(), Mockito.any())).thenReturn(regionTopicHotConfig); + ClusterDO clusterDO1 = getClusterDO(); + ClusterDO clusterDO2 = getClusterDO(); + clusterDO2.setId(INVALID_CLUSTER_ID); + Mockito.when(clusterService.list()).thenReturn(Arrays.asList(clusterDO1, clusterDO2)); + + List regionHotTopics = expertService.getRegionHotTopics(); + Assert.assertFalse(regionHotTopics.isEmpty()); + } + +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppServiceTest.java index 88c8f433..68c38037 100644 --- a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppServiceTest.java +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/AppServiceTest.java @@ -27,13 +27,13 @@ public class AppServiceTest extends BaseTest { public static Object[][] provideAppDO() { AppDO appDO = new AppDO(); appDO.setId(4L); - appDO.setAppId("testAppId"); - appDO.setName("testApp"); - appDO.setPassword("testApp"); + appDO.setAppId("moduleTestAppId"); + appDO.setName("moduleTestApp"); + appDO.setPassword("moduleTestApp"); appDO.setType(1); appDO.setApplicant("admin"); appDO.setPrincipals("admin"); - appDO.setDescription("testApp"); + appDO.setDescription("moduleTestApp"); appDO.setCreateTime(new Date(1638786493173L)); appDO.setModifyTime(new Date(1638786493173L)); return new Object[][] {{appDO}}; @@ -59,7 +59,6 @@ public class AppServiceTest extends BaseTest { addApp2MysqlErrorTest(); } - @Rollback(false) private void addApp2SuccessTest(AppDO appDO) { ResultStatus addAppResult = appService.addApp(appDO, "admin"); Assert.assertEquals(addAppResult.getCode(), ResultStatus.SUCCESS.getCode()); @@ -70,7 +69,6 @@ public class AppServiceTest extends BaseTest { Assert.assertEquals(addAppResult.getCode(), ResultStatus.RESOURCE_ALREADY_EXISTED.getCode()); } - @Rollback() private void addApp2MysqlErrorTest() { ResultStatus addAppResult = appService.addApp(new AppDO(), "admin"); Assert.assertEquals(addAppResult.getCode(), ResultStatus.MYSQL_ERROR.getCode()); @@ -78,20 +76,24 @@ public class AppServiceTest extends BaseTest { @Test(dataProvider = "provideAppDO") public void deleteAppTest(AppDO appDO) { + appService.addApp(appDO, "admin"); + // 测试删除app成功 deleteApp2SuccessTest(appDO); // 测试删除app失败 - deleteApp2FailureTest(); + deleteApp2FailureTest(appDO); } - @Rollback() private void deleteApp2SuccessTest(AppDO appDO) { + appService.addApp(appDO, "admin"); + int result = appService.deleteApp(appDO, "admin"); Assert.assertEquals(result, 1); } - @Rollback() - private void deleteApp2FailureTest() { + private void deleteApp2FailureTest(AppDO appDO) { + appService.addApp(appDO, "admin"); + int result = appService.deleteApp(new AppDO(), "admin"); Assert.assertEquals(result, 0); } @@ -116,7 +118,6 @@ public class AppServiceTest extends BaseTest { Assert.assertEquals(result.getCode(), ResultStatus.USER_WITHOUT_AUTHORITY.getCode()); } - @Rollback() private void updateByAppId2SucessTest(AppDTO appDTO) { ResultStatus result1 = appService.updateByAppId(appDTO, "admin", false); Assert.assertEquals(result1.getCode(), ResultStatus.SUCCESS.getCode()); diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionServiceTest.java new file mode 100644 index 00000000..12933340 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/gateway/TopicConnectionServiceTest.java @@ -0,0 +1,85 @@ +package com.xiaojukeji.kafka.manager.service.service.gateway; + +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.Date; +import java.util.List; + +/** + * @author xuguang + * @Date 2021/12/7 + */ +public class TopicConnectionServiceTest extends BaseTest { + + @Autowired + private TopicConnectionService topicConnectionService; + + private static final String TOPIC_NAME = "moduleTest"; + + private static final Long CLUSTER_ID = 1L; + + private static final String APP_ID = "dkm_admin"; + + @DataProvider(name = "provideTopicConnection") + public static Object[][] provideTopicConnection() { + TopicConnectionDO topicConnectionDO = new TopicConnectionDO(); + topicConnectionDO.setId(13L); + topicConnectionDO.setAppId(APP_ID); + topicConnectionDO.setClusterId(CLUSTER_ID); + topicConnectionDO.setTopicName(TOPIC_NAME); + topicConnectionDO.setType("fetch"); + topicConnectionDO.setIp("172.23.142.253"); + topicConnectionDO.setClientVersion("2.4"); + topicConnectionDO.setCreateTime(new Date(1638786493173L)); + return new Object[][] {{topicConnectionDO}}; + } + + // 测试批量插入为空的情况 + @Test + private void batchAdd2EmptyTest() { + topicConnectionService.batchAdd(new ArrayList<>()); + } + + // 测试批量插入成功的情况,通过调整list的数量和TopicConnectionServiceImpl中splitInterval的数量,使每个流程都测试一遍 + @Test(dataProvider = "provideTopicConnection") + private void batchAdd2SuccessTest(TopicConnectionDO topicConnectionDO) { + List list = new ArrayList<>(); + list.add(topicConnectionDO); + list.add(topicConnectionDO); + list.add(topicConnectionDO); + topicConnectionService.batchAdd(list); + } + + @Test(dataProvider = "provideTopicConnection") + public void getByTopicName2Test(TopicConnectionDO topicConnectionDO) { + List result = topicConnectionService.getByTopicName(CLUSTER_ID, TOPIC_NAME, new Date(0L), new Date()); + Assert.assertEquals(result.size(), 1); + Assert.assertEquals(result.get(0).toString(), topicConnectionDO.toString()); + } + + // 测试获取数据时为空 + @Test + public void getByTopicName2EmptyTest() { + List result = topicConnectionService.getByTopicName(100L, "xgTestxxx", new Date(0L), new Date()); + Assert.assertTrue(result.isEmpty()); + } + + // 测试获取数据,clusterId不为null,TODO + @Test(dataProvider = "provideTopicConnection") + public void getByTopicName2SuccessTest(TopicConnectionDO topicConnectionDO) { + List list = new ArrayList<>(); + list.add(topicConnectionDO); + topicConnectionService.batchAdd(list); + + List result = topicConnectionService.getByTopicName(CLUSTER_ID, TOPIC_NAME, new Date(0L), new Date()); + Assert.assertTrue(result.stream().anyMatch(topicConnection -> topicConnection.getTopicName().equals(TOPIC_NAME) + && topicConnection.getClusterId().equals(CLUSTER_ID))); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommandsTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommandsTest.java new file mode 100644 index 00000000..9ca102c3 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicCommandsTest.java @@ -0,0 +1,234 @@ +package com.xiaojukeji.kafka.manager.service.utils; + +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.Properties; + +/** + * @author xuguang + * @Date 2022/1/6 + */ +public class TopicCommandsTest extends BaseTest { + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static String TEST_CREATE_TOPIC = "createTopicTest"; + + private final static String REAL_TOPIC_IN_ZK = "moduleTest"; + + private final static String INVALID_TOPIC = ".,&"; + + private final static Integer PARTITION_NUM = 1; + + private final static Integer REPLICA_NUM = 1; + + private final static Integer BROKER_ID = 1; + + public ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + + @Test(description = "测试创建topic") + public void createTopicTest() { + // NullPointerException + createTopic2NullPointerExceptionTest(); + // InvalidPartitions + createTopic2InvalidPartitionsTest(); + // InvalidReplicationFactor + createTopic2InvalidReplicationFactorTest(); + // topic exists + createTopic2TopicExistsTest(); + // invalid topic + createTopic2InvalidTopicTest(); + // success + createTopic2SuccessTest(); + } + + private void createTopic2NullPointerExceptionTest() { + ClusterDO clusterDO = getClusterDO(); + clusterDO.setZookeeper(null); + clusterDO.setBootstrapServers(null); + ResultStatus result = TopicCommands.createTopic( + clusterDO, + TEST_CREATE_TOPIC, + PARTITION_NUM, + REPLICA_NUM, + Arrays.asList(BROKER_ID), + new Properties() + ); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_PARAM_NULL_POINTER.getCode()); + } + + private void createTopic2InvalidPartitionsTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.createTopic( + clusterDO, + TEST_CREATE_TOPIC, + -1, + REPLICA_NUM, + Arrays.asList(BROKER_ID), + new Properties() + ); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_PARTITION_NUM_ILLEGAL.getCode()); + } + + private void createTopic2InvalidReplicationFactorTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.createTopic( + clusterDO, + TEST_CREATE_TOPIC, + PARTITION_NUM, + REPLICA_NUM, + Collections.emptyList(), + new Properties() + ); + Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NUM_NOT_ENOUGH.getCode()); + } + + private void createTopic2TopicExistsTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.createTopic( + clusterDO, + REAL_TOPIC_IN_ZK, + PARTITION_NUM, + REPLICA_NUM, + Arrays.asList(BROKER_ID), + new Properties() + ); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_TOPIC_EXISTED.getCode()); + } + + private void createTopic2InvalidTopicTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.createTopic( + clusterDO, + INVALID_TOPIC, + PARTITION_NUM, + REPLICA_NUM, + Arrays.asList(BROKER_ID), + new Properties() + ); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_TOPIC_NAME_ILLEGAL.getCode()); + } + + private void createTopic2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.createTopic( + clusterDO, + TEST_CREATE_TOPIC, + PARTITION_NUM, + REPLICA_NUM, + Arrays.asList(BROKER_ID), + new Properties() + ); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试修改topic配置") + public void modifyTopicConfigTest() { + // AdminOperationException + modifyTopicConfig2AdminOperationExceptionTest(); + // InvalidConfigurationException + modifyTopicConfig2InvalidConfigurationExceptionTest(); + // success + modifyTopicConfig2SuccessTest(); + } + + private void modifyTopicConfig2AdminOperationExceptionTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.modifyTopicConfig(clusterDO, INVALID_TOPIC, new Properties()); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION.getCode()); + } + + private void modifyTopicConfig2InvalidConfigurationExceptionTest() { + ClusterDO clusterDO = getClusterDO(); + Properties properties = new Properties(); + properties.setProperty("xxx", "xxx"); + ResultStatus result = TopicCommands.modifyTopicConfig(clusterDO, REAL_TOPIC_IN_ZK, properties); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_TOPIC_CONFIG_ILLEGAL.getCode()); + } + + private void modifyTopicConfig2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + Properties properties = new Properties(); + ResultStatus result = TopicCommands.modifyTopicConfig(clusterDO, REAL_TOPIC_IN_ZK, properties); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试扩分区") + public void expandTopicTest() { + // failed + expandTopic2FailureTest(); + // success + expandTopic2SuccessTest(); + } + + private void expandTopic2FailureTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.expandTopic( + clusterDO, + INVALID_TOPIC, + PARTITION_NUM + 1, + Arrays.asList(BROKER_ID, 2) + ); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR.getCode()); + } + + private void expandTopic2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.expandTopic( + clusterDO, + TEST_CREATE_TOPIC, + PARTITION_NUM + 1, + Arrays.asList(BROKER_ID, 2) + ); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试删除Topic") + public void deleteTopicTest() { + // UnknownTopicOrPartitionException + deleteTopic2UnknownTopicOrPartitionExceptionTest(); + // NullPointerException + deleteTopic2NullPointerExceptionTest(); + // success + deleteTopic2SuccessTest(); + } + + private void deleteTopic2UnknownTopicOrPartitionExceptionTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.deleteTopic(clusterDO, INVALID_TOPIC); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION.getCode()); + } + + private void deleteTopic2NullPointerExceptionTest() { + ClusterDO clusterDO = getClusterDO(); + clusterDO.setBootstrapServers(null); + clusterDO.setZookeeper(null); + ResultStatus result = TopicCommands.deleteTopic(clusterDO, INVALID_TOPIC); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR.getCode()); + } + + private void deleteTopic2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + ResultStatus result = TopicCommands.deleteTopic(clusterDO, TEST_CREATE_TOPIC); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicReassignUtilsTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicReassignUtilsTest.java new file mode 100644 index 00000000..4cc05c02 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/utils/TopicReassignUtilsTest.java @@ -0,0 +1,78 @@ +package com.xiaojukeji.kafka.manager.service.utils; + +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.junit.Assert; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Date; + +/** + * @author xuguang + * @Date 2022/1/6 + */ +public class TopicReassignUtilsTest extends BaseTest { + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static String TEST_CREATE_TOPIC = "createTopicTest"; + + private final static String REAL_TOPIC_IN_ZK = "moduleTest"; + + private final static String INVALID_TOPIC = ".,&"; + + private final static Integer PARTITION_NUM = 1; + + private final static Integer REPLICA_NUM = 1; + + private final static Integer BROKER_ID = 1; + + private final static Integer PARTITION_ID = 1; + + public ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + @Test + public void generateReassignmentJsonTest() { + // null + generateReassignmentJson2NullPointerExceptionTest(); + // not null + generateReassignmentJson2SuccessTest(); + } + + private void generateReassignmentJson2NullPointerExceptionTest() { + ClusterDO clusterDO = getClusterDO(); + clusterDO.setZookeeper(null); + String result = TopicReassignUtils.generateReassignmentJson( + clusterDO, + REAL_TOPIC_IN_ZK, + Arrays.asList(PARTITION_ID), + Arrays.asList(BROKER_ID) + ); + + Assert.assertNull(result); + } + + private void generateReassignmentJson2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + String result = TopicReassignUtils.generateReassignmentJson( + clusterDO, + REAL_TOPIC_IN_ZK, + Arrays.asList(PARTITION_ID), + Arrays.asList(BROKER_ID) + ); + + Assert.assertNotNull(result); + } +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterHostTaskServiceTest.java b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterHostTaskServiceTest.java new file mode 100644 index 00000000..ccb7e594 --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterHostTaskServiceTest.java @@ -0,0 +1,80 @@ +package com.xiaojukeji.kafka.manager.kcm; + +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData; +import com.xiaojukeji.kafka.manager.kcm.common.entry.dto.ClusterHostTaskDTO; +import com.xiaojukeji.kafka.manager.kcm.config.BaseTest; +import com.xiaojukeji.kafka.manager.kcm.tasks.ClusterHostTaskService; +import org.junit.Assert; +import org.mockito.InjectMocks; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.annotations.Test; + +import java.util.Arrays; + +/** + * @author xuguang + * @Date 2022/1/5 + */ +public class ClusterHostTaskServiceTest extends BaseTest { + + @Autowired + @InjectMocks + private ClusterHostTaskService clusterHostTaskService; + + private ClusterHostTaskDTO getClusterHostTaskDTO() { + ClusterHostTaskDTO clusterHostTaskDTO = new ClusterHostTaskDTO(); + clusterHostTaskDTO.setClusterId(-1L); + clusterHostTaskDTO.setHostList(Arrays.asList("127.0.0.1")); + clusterHostTaskDTO.setTaskType(""); + clusterHostTaskDTO.setKafkaFileBaseUrl(""); + clusterHostTaskDTO.setKafkaPackageMd5(""); + clusterHostTaskDTO.setKafkaPackageName(""); + clusterHostTaskDTO.setServerPropertiesMd5(""); + clusterHostTaskDTO.setServerPropertiesName(""); + return clusterHostTaskDTO; + } + + @Test + public void getOperationHostsTest() { + // cluster task host list illegal + getOperationHosts2HostListIllegalTest(); + // success + getOperationHosts2SuccessTest(); + } + + private void getOperationHosts2HostListIllegalTest() { + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + clusterHostTaskDTO.setHostList(Arrays.asList("")); + Result result = clusterHostTaskService.getOperationHosts(clusterHostTaskDTO); + Assert.assertEquals(result.getCode(), ResultStatus.CLUSTER_TASK_HOST_LIST_ILLEGAL.getCode()); + } + + private void getOperationHosts2SuccessTest() { + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + Result result = clusterHostTaskService.getOperationHosts(clusterHostTaskDTO); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test + public void getCreateTaskParamDTOTest() { + // not success + getCreateTaskParamDTO2NotSuccessTest(); + // success + getCreateTaskParamDTO2SuccessTest(); + } + + private void getCreateTaskParamDTO2NotSuccessTest() { + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + clusterHostTaskDTO.setHostList(Arrays.asList("")); + Result dto = clusterHostTaskService.getCreateTaskParamDTO(clusterHostTaskDTO); + Assert.assertEquals(dto.getCode(), ResultStatus.CLUSTER_TASK_HOST_LIST_ILLEGAL.getCode()); + } + + private void getCreateTaskParamDTO2SuccessTest() { + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + Result dto = clusterHostTaskService.getCreateTaskParamDTO(clusterHostTaskDTO); + Assert.assertEquals(dto.getCode(), ResultStatus.SUCCESS.getCode()); + } +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterRoleTaskServiceTest.java b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterRoleTaskServiceTest.java new file mode 100644 index 00000000..7943a88e --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterRoleTaskServiceTest.java @@ -0,0 +1,74 @@ +package com.xiaojukeji.kafka.manager.kcm; + +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaBrokerRoleEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.CreationTaskData; +import com.xiaojukeji.kafka.manager.kcm.common.entry.dto.ClusterRoleTaskDTO; +import com.xiaojukeji.kafka.manager.kcm.config.BaseTest; +import com.xiaojukeji.kafka.manager.kcm.tasks.ClusterRoleTaskService; +import org.junit.Assert; +import org.mockito.InjectMocks; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * @author xuguang + * @Date 2022/1/5 + */ +public class ClusterRoleTaskServiceTest extends BaseTest { + + @Autowired + @InjectMocks + private ClusterRoleTaskService clusterRoleTaskService; + + private ClusterRoleTaskDTO getClusterRoleTaskDTO() { + ClusterRoleTaskDTO clusterRoleTaskDTO = new ClusterRoleTaskDTO(); + clusterRoleTaskDTO.setClusterId(-1L); + clusterRoleTaskDTO.setTaskType(""); + return clusterRoleTaskDTO; + } + + @Test + public void getOperationHostsTest() { + // controller not alive + getOperationHosts2ControllerNotAliveTest(); + // success + getOperationHosts2SuccessTest(); + } + + private void getOperationHosts2ControllerNotAliveTest() { + ClusterRoleTaskDTO dto = getClusterRoleTaskDTO(); + List upgradeSequenceList = new ArrayList<>(); + upgradeSequenceList.add(KafkaBrokerRoleEnum.CONTROLLER.getRole()); + dto.setUpgradeSequenceList(upgradeSequenceList); + dto.setKafkaRoleBrokerHostMap(new HashMap<>(0)); + + Result result = clusterRoleTaskService.getOperationHosts(dto); + Assert.assertEquals(result.getCode(), ResultStatus.CONTROLLER_NOT_ALIVE.getCode()); + } + + private void getOperationHosts2SuccessTest() { + ClusterRoleTaskDTO dto = getClusterRoleTaskDTO(); + List upgradeSequenceList = new ArrayList<>(); + upgradeSequenceList.add(KafkaBrokerRoleEnum.CONTROLLER.getRole()); + upgradeSequenceList.add(KafkaBrokerRoleEnum.NORMAL.getRole()); + upgradeSequenceList.add(KafkaBrokerRoleEnum.COORDINATOR.getRole()); + dto.setUpgradeSequenceList(upgradeSequenceList); + Map> map = new HashMap<>(); + List controllerList = new ArrayList<>(); + controllerList.add("127.0.0.1"); + controllerList.add("localhost"); + List coordinatorList = new ArrayList<>(); + coordinatorList.add("127.0.0.1"); + coordinatorList.add("localhost"); + map.put(KafkaBrokerRoleEnum.CONTROLLER.getRole(), controllerList); + map.put(KafkaBrokerRoleEnum.COORDINATOR.getRole(), coordinatorList); + dto.setKafkaRoleBrokerHostMap(map); + + Result result = clusterRoleTaskService.getOperationHosts(dto); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } +} diff --git a/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterTaskServiceTest.java b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterTaskServiceTest.java index c204a0e5..35acc87e 100644 --- a/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterTaskServiceTest.java +++ b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/ClusterTaskServiceTest.java @@ -1,8 +1,354 @@ package com.xiaojukeji.kafka.manager.kcm; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterTaskDO; +import com.xiaojukeji.kafka.manager.dao.ClusterTaskDao; +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskActionEnum; +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskStateEnum; +import com.xiaojukeji.kafka.manager.kcm.common.bizenum.ClusterTaskTypeEnum; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.ClusterTaskLog; +import com.xiaojukeji.kafka.manager.kcm.common.entry.ao.ClusterTaskStatus; +import com.xiaojukeji.kafka.manager.kcm.common.entry.dto.AbstractClusterTaskDTO; +import com.xiaojukeji.kafka.manager.kcm.common.entry.dto.ClusterHostTaskDTO; +import com.xiaojukeji.kafka.manager.kcm.component.agent.AbstractAgent; +import com.xiaojukeji.kafka.manager.kcm.config.BaseTest; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.Arrays; + /** * @author xuguang * @Date 2021/12/27 */ -public class ClusterTaskServiceTest { +public class ClusterTaskServiceTest extends BaseTest { + + private static final Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private static final String ADMIN = "admin"; + + private static final String BASEURL = "127.0.0.1"; + + private static final String MD5 = "md5"; + + private static final Long REAL_TASK_ID_IN_MYSQL = 1L; + + private static final Long INVALID_TASK_ID = -1L; + + + @Autowired + @InjectMocks + private ClusterTaskService clusterTaskService; + + @Mock + private AbstractAgent abstractAgent; + + @Mock + private ClusterTaskDao clusterTaskDao; + + private ClusterHostTaskDTO getClusterHostTaskDTO() { + ClusterHostTaskDTO clusterHostTaskDTO = new ClusterHostTaskDTO(); + clusterHostTaskDTO.setClusterId(-1L); + clusterHostTaskDTO.setHostList(Arrays.asList(BASEURL)); + clusterHostTaskDTO.setTaskType(ClusterTaskTypeEnum.HOST_UPGRADE.getName()); + clusterHostTaskDTO.setKafkaFileBaseUrl(BASEURL); + clusterHostTaskDTO.setKafkaPackageMd5(MD5); + clusterHostTaskDTO.setKafkaPackageName("name"); + clusterHostTaskDTO.setServerPropertiesMd5(MD5); + clusterHostTaskDTO.setServerPropertiesName("name"); + return clusterHostTaskDTO; + } + + private ClusterTaskDO getClusterTaskDO() { + ClusterTaskDO clusterTaskDO = new ClusterTaskDO(); + clusterTaskDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + clusterTaskDO.setId(REAL_TASK_ID_IN_MYSQL); + clusterTaskDO.setAgentTaskId(-1L); + clusterTaskDO.setAgentRollbackTaskId(-1L); + clusterTaskDO.setTaskStatus(0); + return clusterTaskDO; + } + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test(description = "测试创建任务") + public void createTaskTest() { + // paramIllegal + createTask2ParamIllegalTest(); + // not success + createTask2NotSuccessTest(); + // CallClusterTaskAgentFailed + createTask2CallClusterTaskAgentFailedTest(); + // success + createTask2SuccessTest(); + // mysqlError + createTask2MysqlErrorTest(); + } + + private void createTask2ParamIllegalTest() { + AbstractClusterTaskDTO dto = getClusterHostTaskDTO(); + dto.setTaskType(null); + Result result1 = clusterTaskService.createTask(dto, ADMIN); + Assert.assertEquals(result1.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + + dto.setTaskType(""); + Result result2 = clusterTaskService.createTask(dto, ADMIN); + Assert.assertEquals(result2.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void createTask2NotSuccessTest() { + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + clusterHostTaskDTO.setHostList(Arrays.asList("host")); + Result result = clusterTaskService.createTask(clusterHostTaskDTO, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.CLUSTER_TASK_HOST_LIST_ILLEGAL.getCode()); + } + + private void createTask2CallClusterTaskAgentFailedTest() { + Mockito.when(abstractAgent.createTask(Mockito.any())).thenReturn(null); + + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + Result result = clusterTaskService.createTask(clusterHostTaskDTO, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED.getCode()); + } + + private void createTask2SuccessTest() { + Mockito.when(abstractAgent.createTask(Mockito.any())).thenReturn(Result.buildSuc(1L)); + Mockito.when(clusterTaskDao.insert(Mockito.any())).thenReturn(1); + + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + Result result = clusterTaskService.createTask(clusterHostTaskDTO, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + private void createTask2MysqlErrorTest() { + Mockito.when(abstractAgent.createTask(Mockito.any())).thenReturn(Result.buildSuc(1L)); + Mockito.when(clusterTaskDao.insert(Mockito.any())).thenThrow(RuntimeException.class); + + ClusterHostTaskDTO clusterHostTaskDTO = getClusterHostTaskDTO(); + Result result = clusterTaskService.createTask(clusterHostTaskDTO, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + } + + @Test + public void executeTaskTest() { + // task not exist + executeTask2TaskNotExistTest(); + // CallClusterTaskAgentFailed + executeTask2CallClusterTaskAgentFailedTest(); + // 暂停状态, 可以执行开始 + executeTask2StartTest(); + // 运行状态, 可以执行暂停 + executeTask2PauseTest(); + // 忽略 + executeTask2IgnoreTest(); + // 取消 + executeTask2CancelTest(); + // operation failed + executeTask2OperationFailedTest(); + // 回滚 operation forbidden + executeTask2RollbackForbiddenTest(); + } + + private void executeTask2TaskNotExistTest() { + ResultStatus resultStatus = clusterTaskService.executeTask(INVALID_TASK_ID, ClusterTaskActionEnum.START.getAction(), ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode()); + } + + private void executeTask2CallClusterTaskAgentFailedTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(null); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + ResultStatus resultStatus = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.START.getAction(), ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED.getCode()); + } + + private void executeTask2StartTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.BLOCKED)); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + // success + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(true); + ResultStatus resultStatus = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.START.getAction(), ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + + // operation failed + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(false); + ResultStatus resultStatus2 = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.START.getAction(), ADMIN); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + } + + private void executeTask2PauseTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + // success + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(true); + ResultStatus resultStatus = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.PAUSE.getAction(), ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + + // operation failed + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(false); + ResultStatus resultStatus2 = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.PAUSE.getAction(), ADMIN); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + } + + private void executeTask2IgnoreTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + // success + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(true); + ResultStatus resultStatus = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.IGNORE.getAction(), ""); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + + // operation failed + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(false); + ResultStatus resultStatus2 = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.IGNORE.getAction(), ""); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + } + + private void executeTask2CancelTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + // success + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(true); + ResultStatus resultStatus = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.CANCEL.getAction(), ""); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + + // operation failed + Mockito.when(abstractAgent.actionTask(Mockito.anyLong(), Mockito.any())).thenReturn(false); + ResultStatus resultStatus2 = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.CANCEL.getAction(), ""); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + } + + private void executeTask2OperationFailedTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + // operation failed + ResultStatus resultStatus2 = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.START.getAction(), ADMIN); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + } + + private void executeTask2RollbackForbiddenTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + + // operation failed + ResultStatus resultStatus2 = clusterTaskService.executeTask(REAL_TASK_ID_IN_MYSQL, ClusterTaskActionEnum.ROLLBACK.getAction(), ADMIN); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FORBIDDEN.getCode()); + } + + @Test + public void getTaskLogTest() { + // task not exist + getTaskLog2TaskNotExistTest(); + // call cluster task agent failed + getTaskLog2CallClusterTaskAgentFailedTest(); + // success + getTaskLog2SuccessTest(); + } + + private void getTaskLog2TaskNotExistTest() { + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(null); + + Result result = clusterTaskService.getTaskLog(REAL_TASK_ID_IN_MYSQL, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.TASK_NOT_EXIST.getCode()); + } + + private void getTaskLog2CallClusterTaskAgentFailedTest() { + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + Mockito.when(abstractAgent.getTaskLog(Mockito.anyLong(), Mockito.any())).thenReturn(null); + + Result result = clusterTaskService.getTaskLog(REAL_TASK_ID_IN_MYSQL, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.CALL_CLUSTER_TASK_AGENT_FAILED.getCode()); + } + + private void getTaskLog2SuccessTest() { + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + Mockito.when(abstractAgent.getTaskLog(Mockito.anyLong(), Mockito.any())) + .thenReturn(Result.buildSuc(new ClusterTaskLog(""))); + + Result result = clusterTaskService.getTaskLog(REAL_TASK_ID_IN_MYSQL, ADMIN); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test + public void getTaskStateTest() { + // null + getTaskState2NullTest(); + // + getTaskState2NotNullTest(); + } + + private void getTaskState2NullTest() { + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(null); + + ClusterTaskStateEnum taskState = clusterTaskService.getTaskState(REAL_TASK_ID_IN_MYSQL); + Assert.assertNull(taskState); + } + + private void getTaskState2NotNullTest() { + + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + + ClusterTaskStateEnum taskState = clusterTaskService.getTaskState(REAL_TASK_ID_IN_MYSQL); + Assert.assertNotNull(taskState); + } + + @Test + public void getTaskStatusTest() { + // task not exist + getTaskStatus2TaskNotExistTest(); + // get failed + getTaskStatus2FailedTest(); + // success + getTaskStatus2SuccessTest(); + } + + private void getTaskStatus2TaskNotExistTest() { + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(null); + + Result result = clusterTaskService.getTaskStatus(REAL_TASK_ID_IN_MYSQL); + Assert.assertEquals(result.getCode(), ResultStatus.TASK_NOT_EXIST.getCode()); + } + + private void getTaskStatus2FailedTest() { + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())).thenReturn(Result.buildFailure("")); + + Result result = clusterTaskService.getTaskStatus(REAL_TASK_ID_IN_MYSQL); + Assert.assertEquals(result.getCode(), ResultStatus.FAIL.getCode()); + } + + private void getTaskStatus2SuccessTest() { + ClusterTaskDO clusterTaskDO = getClusterTaskDO(); + Mockito.when(clusterTaskDao.getById(Mockito.anyLong())).thenReturn(clusterTaskDO); + Mockito.when(abstractAgent.getTaskExecuteState(Mockito.anyLong())) + .thenReturn(Result.buildSuc(ClusterTaskStateEnum.RUNNING)); + + Result result = clusterTaskService.getTaskStatus(REAL_TASK_ID_IN_MYSQL); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } } diff --git a/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileServiceTest.java b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileServiceTest.java new file mode 100644 index 00000000..bc119bdd --- /dev/null +++ b/kafka-manager-extends/kafka-manager-kcm/src/test/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileServiceTest.java @@ -0,0 +1,224 @@ +package com.xiaojukeji.kafka.manager.kcm; + +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO; +import com.xiaojukeji.kafka.manager.dao.KafkaFileDao; +import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService; +import com.xiaojukeji.kafka.manager.kcm.config.BaseTest; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.mock.web.MockMultipartFile; +import org.springframework.web.multipart.MultipartFile; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * @author xuguang + * @Date 2022/1/4 + */ +public class KafkaFileServiceTest extends BaseTest { + + private static final Long KAFKA_FILE_ID = 1L; + + private static final String ADMIN = "admin"; + + private KafkaFileDTO getKafkaFileDTO() { + KafkaFileDTO kafkaFileDTO = new KafkaFileDTO(); + kafkaFileDTO.setId(KAFKA_FILE_ID); + kafkaFileDTO.setClusterId(-1L); + kafkaFileDTO.setFileMd5(""); + kafkaFileDTO.setFileName(".tgz"); + kafkaFileDTO.setFileType(KafkaFileEnum.PACKAGE.getCode()); + kafkaFileDTO.setUploadFile(new MockMultipartFile("name", new byte[]{1})); + return kafkaFileDTO; + } + + private KafkaFileDO getKafkaFileDO() { + KafkaFileDO kafkaFileDO = new KafkaFileDO(); + kafkaFileDO.setFileType(KafkaFileEnum.PACKAGE.getCode()); + kafkaFileDO.setFileName(".tgz"); + return kafkaFileDO; + } + + @Autowired + @InjectMocks + private KafkaFileService kafkaFileService; + + @Mock + private KafkaFileDao kafkaFileDao; + + @Mock + private AbstractStorageService storageService; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void uploadKafkaFile() { + // paramIllegal + uploadKafkaFile2ParamIllegalTest(); + // mysqlError + uploadKafkaFile2MysqlErrorTest(); + // storage upload file failed + uploadKafkaFile2StorageUploadFileFailedTest(); + // success + uploadKafkaFile2SuccessTest(); + // DuplicateKey + uploadKafkaFile2DuplicateKeyTest(); + } + + private void uploadKafkaFile2ParamIllegalTest() { + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + kafkaFileDTO.setUploadFile(null); + ResultStatus resultStatus = kafkaFileService.uploadKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void uploadKafkaFile2MysqlErrorTest() { + Mockito.when(kafkaFileDao.insert(Mockito.any())).thenReturn(-1); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus = kafkaFileService.uploadKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + } + + private void uploadKafkaFile2StorageUploadFileFailedTest() { + Mockito.when(kafkaFileDao.insert(Mockito.any())).thenReturn(1); + Mockito.when(kafkaFileDao.deleteById(Mockito.any())).thenReturn(1); + Mockito.when(storageService.upload(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(false); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus = kafkaFileService.uploadKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.STORAGE_UPLOAD_FILE_FAILED.getCode()); + } + + private void uploadKafkaFile2SuccessTest() { + Mockito.when(kafkaFileDao.insert(Mockito.any())).thenReturn(1); + Mockito.when(kafkaFileDao.deleteById(Mockito.any())).thenReturn(1); + Mockito.when(storageService.upload(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(true); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus = kafkaFileService.uploadKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + private void uploadKafkaFile2DuplicateKeyTest() { + Mockito.when(kafkaFileDao.insert(Mockito.any())).thenThrow(DuplicateKeyException.class); + Mockito.when(kafkaFileDao.deleteById(Mockito.any())).thenReturn(1); + Mockito.when(storageService.upload(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(true); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus = kafkaFileService.uploadKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_ALREADY_EXISTED.getCode()); + } + + + @Test + public void modifyKafkaFileTest() { + // paramIllegal + modifyKafkaFile2ParamIllegalTest(); + // resource not exist + modifyKafkaFile2ResourceNotExistTest(); + // operation failed + modifyKafkaFile2OperationFailedTest(); + // mysqlError + modifyKafkaFile2MysqlErrorTest(); + // success + modifyKafkaFile2SuccessTest(); + } + + private void modifyKafkaFile2ParamIllegalTest() { + ResultStatus resultStatus = kafkaFileService.modifyKafkaFile(null, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void modifyKafkaFile2ResourceNotExistTest() { + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(null); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus = kafkaFileService.modifyKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode()); + } + + private void modifyKafkaFile2OperationFailedTest() { + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(new KafkaFileDO()); + Mockito.when(kafkaFileDao.updateById(Mockito.any())).thenReturn(-1); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + kafkaFileDTO.setFileType(-1); + ResultStatus resultStatus1 = kafkaFileService.modifyKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus1.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + + kafkaFileDTO.setFileType(KafkaFileEnum.PACKAGE.getCode()); + kafkaFileDTO.setFileName("xxx"); + ResultStatus resultStatus2 = kafkaFileService.modifyKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.OPERATION_FAILED.getCode()); + } + + private void modifyKafkaFile2MysqlErrorTest() { + KafkaFileDO kafkaFileDO = getKafkaFileDO(); + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(kafkaFileDO); + Mockito.when(kafkaFileDao.updateById(Mockito.any())).thenReturn(-1); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus1 = kafkaFileService.modifyKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus1.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + + } + + private void modifyKafkaFile2SuccessTest() { + KafkaFileDO kafkaFileDO = getKafkaFileDO(); + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(kafkaFileDO); + Mockito.when(kafkaFileDao.updateById(Mockito.any())).thenReturn(1); + Mockito.when(storageService.upload(Mockito.any(), Mockito.any(), Mockito.any())).thenReturn(true); + + KafkaFileDTO kafkaFileDTO = getKafkaFileDTO(); + ResultStatus resultStatus1 = kafkaFileService.modifyKafkaFile(kafkaFileDTO, ADMIN); + Assert.assertEquals(resultStatus1.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test + public void downloadKafkaFileTest() { + // resource not exist + downloadKafkaFile2ResourceNotExist(); + // STORAGE_FILE_TYPE_NOT_SUPPORT + downloadKafkaFile2FileNotSupportExist(); + // success + downloadKafkaFile2SuccessExist(); + } + + private void downloadKafkaFile2ResourceNotExist() { + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(null); + Result resultStatus = kafkaFileService.downloadKafkaFile(KAFKA_FILE_ID); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode()); + } + + private void downloadKafkaFile2FileNotSupportExist() { + KafkaFileDO kafkaFileDO = getKafkaFileDO(); + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(kafkaFileDO); + Result resultStatus = kafkaFileService.downloadKafkaFile(KAFKA_FILE_ID); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.STORAGE_FILE_TYPE_NOT_SUPPORT.getCode()); + } + + private void downloadKafkaFile2SuccessExist() { + Mockito.when(storageService.download(Mockito.any(), Mockito.any())).thenReturn(Result.buildFrom(ResultStatus.SUCCESS)); + KafkaFileDO kafkaFileDO = getKafkaFileDO(); + kafkaFileDO.setFileType(KafkaFileEnum.SERVER_CONFIG.getCode()); + Mockito.when(kafkaFileDao.getById(Mockito.anyLong())).thenReturn(kafkaFileDO); + + Result resultStatus = kafkaFileService.downloadKafkaFile(KAFKA_FILE_ID); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + +}