Merge branch 'dev_v2.5.0_addtest' of github.com:didi/LogiKM into dev_v2.5.0_addtest

 Conflicts:
	kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ExpertServiceTest.java
This commit is contained in:
didi
2022-01-07 11:50:15 +08:00
28 changed files with 4638 additions and 33 deletions

View File

@@ -0,0 +1,375 @@
package com.xiaojukeji.kafka.manager.service.service;
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusEnum;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Date;
import java.util.Properties;
/**
* @author xuguang
* @Date 2021/12/24
*/
public class AdminServiceTest extends BaseTest {
/**
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子在broker1上
*/
private final static String REAL_TOPIC1_IN_ZK = "moduleTest";
/**
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子在broker1,2,3上
*/
private final static String REAL_TOPIC2_IN_ZK = "xgTest";
private final static String INVALID_TOPIC = "xxxxx";
private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets";
private final static String CREATE_TOPIC_TEST = "createTopicTest";
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
private final static Integer REAL_BROKER_ID_IN_ZK = 1;
private final static Long INVALID_CLUSTER_ID = -1L;
private final static Integer INVALID_PARTITION_ID = -1;
private final static Integer REAL_PARTITION_ID = 0;
private final static Integer INVALID_BROKER_ID = -1;
private final static String APP_ID = "dkm_admin";
private final static Long INVALID_REGION_ID = -1L;
private final static Long REAL_REGION_ID_IN_MYSQL = 1L;
private final static String ADMIN = "admin";
@Autowired
private AdminService adminService;
@Autowired
private TopicManagerService topicManagerService;
private TopicDO getTopicDO() {
TopicDO topicDO = new TopicDO();
topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
topicDO.setTopicName(CREATE_TOPIC_TEST);
topicDO.setAppId(APP_ID);
topicDO.setDescription(CREATE_TOPIC_TEST);
topicDO.setPeakBytesIn(100000L);
return topicDO;
}
public ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
clusterDO.setClusterName("LogiKM_moduleTest");
clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg");
clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093");
clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }");
clusterDO.setStatus(1);
clusterDO.setGmtCreate(new Date());
clusterDO.setGmtModify(new Date());
return clusterDO;
}
@Test(description = "测试创建topic")
public void createTopicTest() {
// broker not exist
createTopic2BrokerNotExistTest();
// success to create topic
createTopic2SuccessTest();
// failure to create topic, topic already exists
createTopic2FailureTest();
}
private void createTopic2BrokerNotExistTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
1L,
Arrays.asList(INVALID_BROKER_ID),
new Properties(),
ADMIN,
ADMIN);
Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode());
}
private void createTopic2FailureTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
new Properties(),
ADMIN,
ADMIN);
Assert.assertNotEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
private void createTopic2SuccessTest() {
TopicDO topicDO = getTopicDO();
ClusterDO clusterDO = getClusterDO();
ResultStatus result = adminService.createTopic(
clusterDO,
topicDO,
1,
1,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
new Properties(),
ADMIN,
ADMIN);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试删除topic")
public void deleteTopicTest() {
// topic does not exist
deleteTopic2FailureTest();
// success to delete
deleteTopic2SuccessTest();
}
private void deleteTopic2FailureTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.deleteTopic(
clusterDO,
INVALID_TOPIC,
ADMIN
);
Assert.assertNotEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
private void deleteTopic2SuccessTest() {
TopicDO topicDO = getTopicDO();
topicManagerService.addTopic(topicDO);
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.deleteTopic(
clusterDO,
CREATE_TOPIC_TEST,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试优先副本选举状态")
public void preferredReplicaElectionStatusTest() {
// running
preferredReplicaElectionStatus2RunningTest();
// not running
preferredReplicaElectionStatus2NotRunningTest();
}
private void preferredReplicaElectionStatus2RunningTest() {
// zk上需要创建/admin/preferred_replica_election节点
ClusterDO clusterDO = getClusterDO();
TaskStatusEnum taskStatusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
Assert.assertEquals(taskStatusEnum.getCode(), TaskStatusEnum.RUNNING.getCode());
}
private void preferredReplicaElectionStatus2NotRunningTest() {
ClusterDO clusterDO = getClusterDO();
// zk上无/admin/preferred_replica_election节点
TaskStatusEnum taskStatusEnum = adminService.preferredReplicaElectionStatus(clusterDO);
Assert.assertEquals(taskStatusEnum.getCode(), TaskStatusEnum.SUCCEED.getCode());
}
@Test(description = "测试集群纬度优先副本选举")
public void preferredReplicaElectionOfCluster2Test() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(clusterDO, ADMIN);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "Broker纬度优先副本选举")
public void preferredReplicaElectionOfBrokerTest() {
// 参数异常
preferredReplicaElectionOfBroker2ParamIllegalTest();
// success
preferredReplicaElectionOfBroker2SuccessTest();
}
private void preferredReplicaElectionOfBroker2ParamIllegalTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
INVALID_BROKER_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARAM_ILLEGAL.getCode());
}
private void preferredReplicaElectionOfBroker2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_BROKER_ID_IN_ZK,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "Topic纬度优先副本选举")
public void preferredReplicaElectionOfTopicTest() {
// topic not exist
preferredReplicaElectionOfTopic2TopicNotExistTest();
// success
preferredReplicaElectionOfTopic2SuccessTest();
}
private void preferredReplicaElectionOfTopic2TopicNotExistTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
INVALID_TOPIC,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode());
}
private void preferredReplicaElectionOfTopic2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_TOPIC1_IN_ZK,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "Topic纬度优先副本选举")
public void preferredReplicaElectionOfPartitionTest() {
// topic not exist
preferredReplicaElectionOfPartition2TopicNotExistTest();
// partition Not Exist
preferredReplicaElectionOfPartition2PartitionNotExistTest();
// success
preferredReplicaElectionOfPartition2SuccessTest();
}
private void preferredReplicaElectionOfPartition2TopicNotExistTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
INVALID_TOPIC,
INVALID_PARTITION_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode());
}
private void preferredReplicaElectionOfPartition2PartitionNotExistTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_TOPIC2_IN_ZK,
INVALID_PARTITION_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.PARTITION_NOT_EXIST.getCode());
}
private void preferredReplicaElectionOfPartition2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.preferredReplicaElection(
clusterDO,
REAL_TOPIC2_IN_ZK,
REAL_PARTITION_ID,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试获取Topic配置")
public void getTopicConfigTest() {
// result is null
getTopicConfig2NullTest();
// result not null
getTopicConfig2NotNullTest();
}
private void getTopicConfig2NullTest() {
ClusterDO clusterDO = getClusterDO();
clusterDO.setId(INVALID_CLUSTER_ID);
Properties topicConfig = adminService.getTopicConfig(clusterDO, REAL_TOPIC1_IN_ZK);
Assert.assertNull(topicConfig);
}
private void getTopicConfig2NotNullTest() {
ClusterDO clusterDO = getClusterDO();
Properties topicConfig = adminService.getTopicConfig(clusterDO, REAL_TOPIC1_IN_ZK);
Assert.assertNotNull(topicConfig);
}
@Test(description = "测试修改Topic配置")
public void modifyTopicConfigTest() {
ClusterDO clusterDO = getClusterDO();
Properties properties = new Properties();
properties.put("retention.ms", "21600000");
ResultStatus resultStatus = adminService.modifyTopicConfig(
clusterDO,
REAL_TOPIC1_IN_ZK,
properties,
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试扩分区")
public void expandPartitionsTest() {
// broker not exist
expandPartitions2BrokerNotExistTest();
// success
expandPartitions2SuccessTest();
}
private void expandPartitions2BrokerNotExistTest() {
// 存在两个下线broker, region中包含一个
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.expandPartitions(
clusterDO,
REAL_TOPIC1_IN_ZK,
2,
REAL_REGION_ID_IN_MYSQL,
Arrays.asList(INVALID_BROKER_ID),
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode());
}
private void expandPartitions2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus resultStatus = adminService.expandPartitions(
clusterDO,
REAL_TOPIC1_IN_ZK,
2,
INVALID_REGION_ID,
Arrays.asList(REAL_BROKER_ID_IN_ZK),
ADMIN
);
Assert.assertEquals(resultStatus.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode());
}
}

View File

@@ -27,13 +27,13 @@ public class AppServiceTest extends BaseTest {
public static Object[][] provideAppDO() {
AppDO appDO = new AppDO();
appDO.setId(4L);
appDO.setAppId("testAppId");
appDO.setName("testApp");
appDO.setPassword("testApp");
appDO.setAppId("moduleTestAppId");
appDO.setName("moduleTestApp");
appDO.setPassword("moduleTestApp");
appDO.setType(1);
appDO.setApplicant("admin");
appDO.setPrincipals("admin");
appDO.setDescription("testApp");
appDO.setDescription("moduleTestApp");
appDO.setCreateTime(new Date(1638786493173L));
appDO.setModifyTime(new Date(1638786493173L));
return new Object[][] {{appDO}};
@@ -59,7 +59,6 @@ public class AppServiceTest extends BaseTest {
addApp2MysqlErrorTest();
}
@Rollback(false)
private void addApp2SuccessTest(AppDO appDO) {
ResultStatus addAppResult = appService.addApp(appDO, "admin");
Assert.assertEquals(addAppResult.getCode(), ResultStatus.SUCCESS.getCode());
@@ -70,7 +69,6 @@ public class AppServiceTest extends BaseTest {
Assert.assertEquals(addAppResult.getCode(), ResultStatus.RESOURCE_ALREADY_EXISTED.getCode());
}
@Rollback()
private void addApp2MysqlErrorTest() {
ResultStatus addAppResult = appService.addApp(new AppDO(), "admin");
Assert.assertEquals(addAppResult.getCode(), ResultStatus.MYSQL_ERROR.getCode());
@@ -78,20 +76,24 @@ public class AppServiceTest extends BaseTest {
@Test(dataProvider = "provideAppDO")
public void deleteAppTest(AppDO appDO) {
appService.addApp(appDO, "admin");
// 测试删除app成功
deleteApp2SuccessTest(appDO);
// 测试删除app失败
deleteApp2FailureTest();
deleteApp2FailureTest(appDO);
}
@Rollback()
private void deleteApp2SuccessTest(AppDO appDO) {
appService.addApp(appDO, "admin");
int result = appService.deleteApp(appDO, "admin");
Assert.assertEquals(result, 1);
}
@Rollback()
private void deleteApp2FailureTest() {
private void deleteApp2FailureTest(AppDO appDO) {
appService.addApp(appDO, "admin");
int result = appService.deleteApp(new AppDO(), "admin");
Assert.assertEquals(result, 0);
}
@@ -116,7 +118,6 @@ public class AppServiceTest extends BaseTest {
Assert.assertEquals(result.getCode(), ResultStatus.USER_WITHOUT_AUTHORITY.getCode());
}
@Rollback()
private void updateByAppId2SucessTest(AppDTO appDTO) {
ResultStatus result1 = appService.updateByAppId(appDTO, "admin", false);
Assert.assertEquals(result1.getCode(), ResultStatus.SUCCESS.getCode());

View File

@@ -0,0 +1,85 @@
package com.xiaojukeji.kafka.manager.service.service.gateway;
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.TopicConnection;
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.TopicConnectionDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.springframework.beans.factory.annotation.Autowired;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author xuguang
* @Date 2021/12/7
*/
public class TopicConnectionServiceTest extends BaseTest {
@Autowired
private TopicConnectionService topicConnectionService;
private static final String TOPIC_NAME = "moduleTest";
private static final Long CLUSTER_ID = 1L;
private static final String APP_ID = "dkm_admin";
@DataProvider(name = "provideTopicConnection")
public static Object[][] provideTopicConnection() {
TopicConnectionDO topicConnectionDO = new TopicConnectionDO();
topicConnectionDO.setId(13L);
topicConnectionDO.setAppId(APP_ID);
topicConnectionDO.setClusterId(CLUSTER_ID);
topicConnectionDO.setTopicName(TOPIC_NAME);
topicConnectionDO.setType("fetch");
topicConnectionDO.setIp("172.23.142.253");
topicConnectionDO.setClientVersion("2.4");
topicConnectionDO.setCreateTime(new Date(1638786493173L));
return new Object[][] {{topicConnectionDO}};
}
// 测试批量插入为空的情况
@Test
private void batchAdd2EmptyTest() {
topicConnectionService.batchAdd(new ArrayList<>());
}
// 测试批量插入成功的情况通过调整list的数量和TopicConnectionServiceImpl中splitInterval的数量使每个流程都测试一遍
@Test(dataProvider = "provideTopicConnection")
private void batchAdd2SuccessTest(TopicConnectionDO topicConnectionDO) {
List<TopicConnectionDO> list = new ArrayList<>();
list.add(topicConnectionDO);
list.add(topicConnectionDO);
list.add(topicConnectionDO);
topicConnectionService.batchAdd(list);
}
@Test(dataProvider = "provideTopicConnection")
public void getByTopicName2Test(TopicConnectionDO topicConnectionDO) {
List<TopicConnection> result = topicConnectionService.getByTopicName(CLUSTER_ID, TOPIC_NAME, new Date(0L), new Date());
Assert.assertEquals(result.size(), 1);
Assert.assertEquals(result.get(0).toString(), topicConnectionDO.toString());
}
// 测试获取数据时为空
@Test
public void getByTopicName2EmptyTest() {
List<TopicConnection> result = topicConnectionService.getByTopicName(100L, "xgTestxxx", new Date(0L), new Date());
Assert.assertTrue(result.isEmpty());
}
// 测试获取数据,clusterId不为nullTODO
@Test(dataProvider = "provideTopicConnection")
public void getByTopicName2SuccessTest(TopicConnectionDO topicConnectionDO) {
List<TopicConnectionDO> list = new ArrayList<>();
list.add(topicConnectionDO);
topicConnectionService.batchAdd(list);
List<TopicConnection> result = topicConnectionService.getByTopicName(CLUSTER_ID, TOPIC_NAME, new Date(0L), new Date());
Assert.assertTrue(result.stream().anyMatch(topicConnection -> topicConnection.getTopicName().equals(TOPIC_NAME)
&& topicConnection.getClusterId().equals(CLUSTER_ID)));
}
}

View File

@@ -0,0 +1,234 @@
package com.xiaojukeji.kafka.manager.service.utils;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Properties;
/**
* @author xuguang
* @Date 2022/1/6
*/
public class TopicCommandsTest extends BaseTest {
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
private final static String TEST_CREATE_TOPIC = "createTopicTest";
private final static String REAL_TOPIC_IN_ZK = "moduleTest";
private final static String INVALID_TOPIC = ".,&";
private final static Integer PARTITION_NUM = 1;
private final static Integer REPLICA_NUM = 1;
private final static Integer BROKER_ID = 1;
public ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
clusterDO.setClusterName("LogiKM_moduleTest");
clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg");
clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093");
clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }");
clusterDO.setStatus(1);
clusterDO.setGmtCreate(new Date());
clusterDO.setGmtModify(new Date());
return clusterDO;
}
@Test(description = "测试创建topic")
public void createTopicTest() {
// NullPointerException
createTopic2NullPointerExceptionTest();
// InvalidPartitions
createTopic2InvalidPartitionsTest();
// InvalidReplicationFactor
createTopic2InvalidReplicationFactorTest();
// topic exists
createTopic2TopicExistsTest();
// invalid topic
createTopic2InvalidTopicTest();
// success
createTopic2SuccessTest();
}
private void createTopic2NullPointerExceptionTest() {
ClusterDO clusterDO = getClusterDO();
clusterDO.setZookeeper(null);
clusterDO.setBootstrapServers(null);
ResultStatus result = TopicCommands.createTopic(
clusterDO,
TEST_CREATE_TOPIC,
PARTITION_NUM,
REPLICA_NUM,
Arrays.asList(BROKER_ID),
new Properties()
);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_PARAM_NULL_POINTER.getCode());
}
private void createTopic2InvalidPartitionsTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.createTopic(
clusterDO,
TEST_CREATE_TOPIC,
-1,
REPLICA_NUM,
Arrays.asList(BROKER_ID),
new Properties()
);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_PARTITION_NUM_ILLEGAL.getCode());
}
private void createTopic2InvalidReplicationFactorTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.createTopic(
clusterDO,
TEST_CREATE_TOPIC,
PARTITION_NUM,
REPLICA_NUM,
Collections.emptyList(),
new Properties()
);
Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NUM_NOT_ENOUGH.getCode());
}
private void createTopic2TopicExistsTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.createTopic(
clusterDO,
REAL_TOPIC_IN_ZK,
PARTITION_NUM,
REPLICA_NUM,
Arrays.asList(BROKER_ID),
new Properties()
);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_TOPIC_EXISTED.getCode());
}
private void createTopic2InvalidTopicTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.createTopic(
clusterDO,
INVALID_TOPIC,
PARTITION_NUM,
REPLICA_NUM,
Arrays.asList(BROKER_ID),
new Properties()
);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_TOPIC_NAME_ILLEGAL.getCode());
}
private void createTopic2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.createTopic(
clusterDO,
TEST_CREATE_TOPIC,
PARTITION_NUM,
REPLICA_NUM,
Arrays.asList(BROKER_ID),
new Properties()
);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试修改topic配置")
public void modifyTopicConfigTest() {
// AdminOperationException
modifyTopicConfig2AdminOperationExceptionTest();
// InvalidConfigurationException
modifyTopicConfig2InvalidConfigurationExceptionTest();
// success
modifyTopicConfig2SuccessTest();
}
private void modifyTopicConfig2AdminOperationExceptionTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.modifyTopicConfig(clusterDO, INVALID_TOPIC, new Properties());
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION.getCode());
}
private void modifyTopicConfig2InvalidConfigurationExceptionTest() {
ClusterDO clusterDO = getClusterDO();
Properties properties = new Properties();
properties.setProperty("xxx", "xxx");
ResultStatus result = TopicCommands.modifyTopicConfig(clusterDO, REAL_TOPIC_IN_ZK, properties);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_TOPIC_CONFIG_ILLEGAL.getCode());
}
private void modifyTopicConfig2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
Properties properties = new Properties();
ResultStatus result = TopicCommands.modifyTopicConfig(clusterDO, REAL_TOPIC_IN_ZK, properties);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试扩分区")
public void expandTopicTest() {
// failed
expandTopic2FailureTest();
// success
expandTopic2SuccessTest();
}
private void expandTopic2FailureTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.expandTopic(
clusterDO,
INVALID_TOPIC,
PARTITION_NUM + 1,
Arrays.asList(BROKER_ID, 2)
);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR.getCode());
}
private void expandTopic2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.expandTopic(
clusterDO,
TEST_CREATE_TOPIC,
PARTITION_NUM + 1,
Arrays.asList(BROKER_ID, 2)
);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
@Test(description = "测试删除Topic")
public void deleteTopicTest() {
// UnknownTopicOrPartitionException
deleteTopic2UnknownTopicOrPartitionExceptionTest();
// NullPointerException
deleteTopic2NullPointerExceptionTest();
// success
deleteTopic2SuccessTest();
}
private void deleteTopic2UnknownTopicOrPartitionExceptionTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.deleteTopic(clusterDO, INVALID_TOPIC);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_TOPIC_PARTITION.getCode());
}
private void deleteTopic2NullPointerExceptionTest() {
ClusterDO clusterDO = getClusterDO();
clusterDO.setBootstrapServers(null);
clusterDO.setZookeeper(null);
ResultStatus result = TopicCommands.deleteTopic(clusterDO, INVALID_TOPIC);
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_OPERATION_UNKNOWN_ERROR.getCode());
}
private void deleteTopic2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
ResultStatus result = TopicCommands.deleteTopic(clusterDO, TEST_CREATE_TOPIC);
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
}
}

View File

@@ -0,0 +1,78 @@
package com.xiaojukeji.kafka.manager.service.utils;
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
import org.junit.Assert;
import org.testng.annotations.Test;
import java.util.Arrays;
import java.util.Date;
/**
* @author xuguang
* @Date 2022/1/6
*/
public class TopicReassignUtilsTest extends BaseTest {
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
private final static String TEST_CREATE_TOPIC = "createTopicTest";
private final static String REAL_TOPIC_IN_ZK = "moduleTest";
private final static String INVALID_TOPIC = ".,&";
private final static Integer PARTITION_NUM = 1;
private final static Integer REPLICA_NUM = 1;
private final static Integer BROKER_ID = 1;
private final static Integer PARTITION_ID = 1;
public ClusterDO getClusterDO() {
ClusterDO clusterDO = new ClusterDO();
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
clusterDO.setClusterName("LogiKM_moduleTest");
clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg");
clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093");
clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }");
clusterDO.setStatus(1);
clusterDO.setGmtCreate(new Date());
clusterDO.setGmtModify(new Date());
return clusterDO;
}
@Test
public void generateReassignmentJsonTest() {
// null
generateReassignmentJson2NullPointerExceptionTest();
// not null
generateReassignmentJson2SuccessTest();
}
private void generateReassignmentJson2NullPointerExceptionTest() {
ClusterDO clusterDO = getClusterDO();
clusterDO.setZookeeper(null);
String result = TopicReassignUtils.generateReassignmentJson(
clusterDO,
REAL_TOPIC_IN_ZK,
Arrays.asList(PARTITION_ID),
Arrays.asList(BROKER_ID)
);
Assert.assertNull(result);
}
private void generateReassignmentJson2SuccessTest() {
ClusterDO clusterDO = getClusterDO();
String result = TopicReassignUtils.generateReassignmentJson(
clusterDO,
REAL_TOPIC_IN_ZK,
Arrays.asList(PARTITION_ID),
Arrays.asList(BROKER_ID)
);
Assert.assertNotNull(result);
}
}