mirror of
https://github.com/didi/KnowStreaming.git
synced 2026-01-03 11:28:12 +08:00
单元测试:AnalysisServiceTest && ConsumerServiceTest && JmxServiceTest &&
LogicalClusterServiceTest && ReassignServiceTest && TopicServiceTest
This commit is contained in:
@@ -0,0 +1,54 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.service.service;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.analysis.AnalysisBrokerDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xuguang
|
||||||
|
* @Date 2021/12/23
|
||||||
|
*/
|
||||||
|
public class AnalysisServiceTest extends BaseTest {
|
||||||
|
|
||||||
|
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
|
||||||
|
|
||||||
|
private final static Integer REAL_BROKER_ID_IN_ZK = 1;
|
||||||
|
|
||||||
|
private final static Long INVALID_CLUSTER_ID = -1L;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private AnalysisService analysisService;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void doAnalysisBrokerTest() {
|
||||||
|
// brokerMetrics is null
|
||||||
|
doAnalysisBroker2brokerMetricsIsNullTest();
|
||||||
|
// brokerMetrics is not null
|
||||||
|
doAnalysisBroker2brokerMetricsIsNotNullTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doAnalysisBroker2brokerMetricsIsNullTest() {
|
||||||
|
AnalysisBrokerDTO analysisBrokerDTO = analysisService.doAnalysisBroker(
|
||||||
|
INVALID_CLUSTER_ID,
|
||||||
|
REAL_BROKER_ID_IN_ZK
|
||||||
|
);
|
||||||
|
Assert.assertNotNull(analysisBrokerDTO);
|
||||||
|
Assert.assertEquals(analysisBrokerDTO.getBrokerId(), REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertEquals(analysisBrokerDTO.getClusterId(), INVALID_CLUSTER_ID);
|
||||||
|
Assert.assertNull(analysisBrokerDTO.getBytesIn());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void doAnalysisBroker2brokerMetricsIsNotNullTest() {
|
||||||
|
AnalysisBrokerDTO analysisBrokerDTO = analysisService.doAnalysisBroker(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK
|
||||||
|
);
|
||||||
|
Assert.assertNotNull(analysisBrokerDTO);
|
||||||
|
Assert.assertEquals(analysisBrokerDTO.getBrokerId(), REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertEquals(analysisBrokerDTO.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
Assert.assertNotNull(analysisBrokerDTO.getBytesIn());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,264 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.service.service;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetLocationEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumeDetailDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupSummary;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
|
||||||
|
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 测试消费组消费情况需要保证集群中存在消费组
|
||||||
|
* @author xuguang
|
||||||
|
* @Date 2021/12/23
|
||||||
|
*/
|
||||||
|
public class ConsumerServiceTest extends BaseTest {
|
||||||
|
|
||||||
|
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
|
||||||
|
|
||||||
|
private final static Integer REAL_BROKER_ID_IN_ZK = 1;
|
||||||
|
|
||||||
|
private final static Long INVALID_CLUSTER_ID = -1L;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC1_IN_ZK = "moduleTest";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC2_IN_ZK = "xgTest";
|
||||||
|
|
||||||
|
private final static String INVALID_TOPIC = "xxxxxx";
|
||||||
|
|
||||||
|
private final static String REAL_CONSUMER_GROUP_NAME = "moduleTestGroup";
|
||||||
|
|
||||||
|
private final static String INVALID_CONSUMER_GROUP_NAME = "xxxxxxxx";
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private ConsumerService consumerService;
|
||||||
|
|
||||||
|
private ClusterDO getClusterDO() {
|
||||||
|
ClusterDO clusterDO = new ClusterDO();
|
||||||
|
clusterDO.setId(1L);
|
||||||
|
clusterDO.setClusterName("LogiKM_moduleTest");
|
||||||
|
clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg");
|
||||||
|
clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093");
|
||||||
|
clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }");
|
||||||
|
clusterDO.setStatus(1);
|
||||||
|
clusterDO.setGmtCreate(new Date());
|
||||||
|
clusterDO.setGmtModify(new Date());
|
||||||
|
return clusterDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ConsumerGroup getConsumerGroup() {
|
||||||
|
return new ConsumerGroup(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_CONSUMER_GROUP_NAME,
|
||||||
|
OffsetLocationEnum.BROKER);
|
||||||
|
}
|
||||||
|
|
||||||
|
private PartitionOffsetDTO getPartitionOffsetDTO() {
|
||||||
|
PartitionOffsetDTO partitionOffsetDTO = new PartitionOffsetDTO();
|
||||||
|
partitionOffsetDTO.setOffset(0L);
|
||||||
|
partitionOffsetDTO.setPartitionId(0);
|
||||||
|
return partitionOffsetDTO;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取消费组列表")
|
||||||
|
public void getConsumerGroupListTest() {
|
||||||
|
List<ConsumerGroup> consumerGroupList = consumerService.getConsumerGroupList(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
Assert.assertFalse(consumerGroupList.isEmpty());
|
||||||
|
Assert.assertTrue(consumerGroupList.stream().allMatch(consumerGroup ->
|
||||||
|
consumerGroup.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试查询消费Topic的消费组")
|
||||||
|
public void getConsumerGroupListWithTopicTest() {
|
||||||
|
List<ConsumerGroup> consumerGroupList = consumerService.getConsumerGroupList(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC1_IN_ZK
|
||||||
|
);
|
||||||
|
Assert.assertFalse(consumerGroupList.isEmpty());
|
||||||
|
Assert.assertTrue(consumerGroupList.stream().allMatch(consumerGroup ->
|
||||||
|
consumerGroup.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取消费Topic的消费组概要信息")
|
||||||
|
public void getConsumerGroupSummariesTest() {
|
||||||
|
// result is empty
|
||||||
|
getConsumerGroupSummaries2EmptyTest();
|
||||||
|
// result is not empty
|
||||||
|
getConsumerGroupSummaries2NotEmptyTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumerGroupSummaries2EmptyTest() {
|
||||||
|
List<ConsumerGroupSummary> consumerGroupSummaries = consumerService.getConsumerGroupSummaries(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_TOPIC
|
||||||
|
);
|
||||||
|
Assert.assertTrue(consumerGroupSummaries.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumerGroupSummaries2NotEmptyTest() {
|
||||||
|
List<ConsumerGroupSummary> consumerGroupSummaries = consumerService.getConsumerGroupSummaries(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC1_IN_ZK
|
||||||
|
);
|
||||||
|
Assert.assertFalse(consumerGroupSummaries.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试查询消费详情")
|
||||||
|
public void getConsumeDetail() {
|
||||||
|
// result is empty
|
||||||
|
getConsumeDetail2Empty();
|
||||||
|
// result is not empty
|
||||||
|
getConsumeDetail2NotEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumeDetail2Empty() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
List<ConsumeDetailDTO> consumeDetail1 =
|
||||||
|
consumerService.getConsumeDetail(clusterDO, INVALID_TOPIC, null);
|
||||||
|
Assert.assertTrue(consumeDetail1.isEmpty());
|
||||||
|
|
||||||
|
ConsumerGroup consumerGroup = getConsumerGroup();
|
||||||
|
consumerGroup.setOffsetStoreLocation(null);
|
||||||
|
List<ConsumeDetailDTO> consumeDetail2 =
|
||||||
|
consumerService.getConsumeDetail(clusterDO, REAL_TOPIC1_IN_ZK, consumerGroup);
|
||||||
|
Assert.assertTrue(consumeDetail2.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumeDetail2NotEmpty() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
ConsumerGroup consumerGroup = getConsumerGroup();
|
||||||
|
List<ConsumeDetailDTO> consumeDetail1 =
|
||||||
|
consumerService.getConsumeDetail(clusterDO, REAL_TOPIC1_IN_ZK, consumerGroup);
|
||||||
|
Assert.assertFalse(consumeDetail1.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取消费组消费的Topic列表")
|
||||||
|
public void getConsumerGroupConsumedTopicListTest() {
|
||||||
|
// result is empty
|
||||||
|
getConsumerGroupConsumedTopicList2Empty();
|
||||||
|
// result is not empty
|
||||||
|
getConsumerGroupConsumedTopicList2NotEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumerGroupConsumedTopicList2Empty() {
|
||||||
|
List<String> list = consumerService.getConsumerGroupConsumedTopicList(
|
||||||
|
null,
|
||||||
|
null,
|
||||||
|
null);
|
||||||
|
Assert.assertTrue(list.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumerGroupConsumedTopicList2NotEmpty() {
|
||||||
|
List<String> list = consumerService.getConsumerGroupConsumedTopicList(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_CONSUMER_GROUP_NAME,
|
||||||
|
"broker");
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取消费者offset")
|
||||||
|
public void getConsumerOffsetTest() {
|
||||||
|
// result is null
|
||||||
|
getConsumerOffset2NullTest();
|
||||||
|
// result is not null
|
||||||
|
getConsumerOffset2NotNullTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumerOffset2NullTest() {
|
||||||
|
Map<Integer, Long> consumerOffset1 = consumerService.getConsumerOffset(null, null, null);
|
||||||
|
Assert.assertNull(consumerOffset1);
|
||||||
|
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
ConsumerGroup consumerGroup = getConsumerGroup();
|
||||||
|
consumerGroup.setOffsetStoreLocation(null);
|
||||||
|
Map<Integer, Long> consumerOffset2 = consumerService.getConsumerOffset(
|
||||||
|
clusterDO,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
consumerGroup
|
||||||
|
);
|
||||||
|
Assert.assertNull(consumerOffset2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getConsumerOffset2NotNullTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
ConsumerGroup consumerGroup = getConsumerGroup();
|
||||||
|
Map<Integer, Long> consumerOffset = consumerService.getConsumerOffset(
|
||||||
|
clusterDO,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
consumerGroup
|
||||||
|
);
|
||||||
|
Assert.assertNotNull(consumerOffset);
|
||||||
|
Assert.assertFalse(consumerOffset.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取每个集群消费组的个数")
|
||||||
|
public void getConsumerGroupNumMapTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
Map<Long, Integer> map = consumerService.getConsumerGroupNumMap(Arrays.asList(clusterDO));
|
||||||
|
Assert.assertFalse(map.isEmpty());
|
||||||
|
Assert.assertTrue(clusterDO.getId() >= 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "验证消费组是否存在")
|
||||||
|
public void checkConsumerGroupExistTest() {
|
||||||
|
// 不存在
|
||||||
|
checkConsumerGroupExist2FalseTest();
|
||||||
|
// 存在
|
||||||
|
checkConsumerGroupExist2TrueTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkConsumerGroupExist2FalseTest() {
|
||||||
|
boolean result = consumerService.checkConsumerGroupExist(
|
||||||
|
OffsetLocationEnum.BROKER,
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
INVALID_CONSUMER_GROUP_NAME
|
||||||
|
);
|
||||||
|
Assert.assertFalse(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkConsumerGroupExist2TrueTest() {
|
||||||
|
boolean result = consumerService.checkConsumerGroupExist(
|
||||||
|
OffsetLocationEnum.BROKER,
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
REAL_CONSUMER_GROUP_NAME
|
||||||
|
);
|
||||||
|
Assert.assertTrue(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试重置offset")
|
||||||
|
public void resetConsumerOffsetTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
ConsumerGroup consumerGroup = getConsumerGroup();
|
||||||
|
PartitionOffsetDTO partitionOffsetDTO1 = getPartitionOffsetDTO();
|
||||||
|
List<Result> results = consumerService.resetConsumerOffset(
|
||||||
|
clusterDO,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
consumerGroup,
|
||||||
|
Arrays.asList(partitionOffsetDTO1)
|
||||||
|
);
|
||||||
|
Assert.assertFalse(results.isEmpty());
|
||||||
|
Assert.assertTrue(results.stream().allMatch(result ->
|
||||||
|
result.getCode() == ResultStatus.SUCCESS.getCode()));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,386 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.service.service;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xuguang
|
||||||
|
* @Date 2021/12/14
|
||||||
|
*/
|
||||||
|
public class JmxServiceTest extends BaseTest {
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC1_IN_ZK = "moduleTest";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC2_IN_ZK = "xgTest";
|
||||||
|
|
||||||
|
private final static String INVALID_TOPIC = "xxxxx";
|
||||||
|
|
||||||
|
private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets";
|
||||||
|
|
||||||
|
private final static String NO_OFFSET_CHANGE_TOPIC_IN_ZK = "NoOffsetChangeTopic";
|
||||||
|
|
||||||
|
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
|
||||||
|
|
||||||
|
private final static Integer REAL_BROKER_ID_IN_ZK = 1;
|
||||||
|
|
||||||
|
private final static Integer INVALID_BROKER_ID = -1;
|
||||||
|
|
||||||
|
private final static Long INVALID_CLUSTER_ID = -1L;
|
||||||
|
|
||||||
|
private final static Integer INVALID_PARTITION_ID = -1;
|
||||||
|
|
||||||
|
private final static String CLIENT_ID = "dkm_admin.moduleTest";
|
||||||
|
|
||||||
|
private final static Integer INVALID_METRICS_CODE = -1;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private JmxService jmxService;
|
||||||
|
|
||||||
|
private PartitionState getPartitionState() {
|
||||||
|
PartitionState partitionState = new PartitionState();
|
||||||
|
partitionState.setPartitionId(0);
|
||||||
|
partitionState.setLeader(2);
|
||||||
|
return partitionState;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getBrokerMetricsTest() {
|
||||||
|
// 结果为空
|
||||||
|
getBrokerMetrics2NullTest();
|
||||||
|
// mbeanV2ListEmpty
|
||||||
|
getBrokerMetrics2mbeanV2ListEmptyTest();
|
||||||
|
// 获取成功
|
||||||
|
getBrokerMetrics2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerMetrics2NullTest() {
|
||||||
|
BrokerMetrics brokerMetrics1 = jmxService.getBrokerMetrics(null, null, null);
|
||||||
|
Assert.assertNull(brokerMetrics1);
|
||||||
|
|
||||||
|
BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_BROKER_ID,
|
||||||
|
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS);
|
||||||
|
Assert.assertNull(brokerMetrics2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerMetrics2mbeanV2ListEmptyTest() {
|
||||||
|
BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK,
|
||||||
|
-1);
|
||||||
|
Assert.assertNotNull(brokerMetrics2);
|
||||||
|
Assert.assertEquals(brokerMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
Assert.assertEquals(brokerMetrics2.getBrokerId(), REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertTrue(brokerMetrics2.getMetricsMap().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerMetrics2SuccessTest() {
|
||||||
|
BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK,
|
||||||
|
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS);
|
||||||
|
Assert.assertNotNull(brokerMetrics2);
|
||||||
|
Assert.assertEquals(brokerMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
Assert.assertEquals(brokerMetrics2.getBrokerId(), REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertFalse(brokerMetrics2.getMetricsMap().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTopicMetricsWithBrokerIdTest() {
|
||||||
|
// 结果为空
|
||||||
|
getTopicMetricsWithBrokerId2nullTest();
|
||||||
|
// 获取的metrics为空
|
||||||
|
getTopicMetricsWithBrokerId2MetricsIsNullTest();
|
||||||
|
// 获取指标成功
|
||||||
|
getTopicMetricsWithBrokerId2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicMetricsWithBrokerId2nullTest() {
|
||||||
|
TopicMetrics topicMetrics1 = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
-1, true);
|
||||||
|
Assert.assertNull(topicMetrics1);
|
||||||
|
|
||||||
|
TopicMetrics topicMetrics2 = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_BROKER_ID,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS
|
||||||
|
, true);
|
||||||
|
Assert.assertNull(topicMetrics2);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicMetricsWithBrokerId2MetricsIsNullTest() {
|
||||||
|
// brokerId为3,不在该topic下
|
||||||
|
TopicMetrics topicMetrics2 = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
3,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
KafkaMetricsCollections.BROKER_ANALYSIS_METRICS
|
||||||
|
, true);
|
||||||
|
Assert.assertNotNull(topicMetrics2);
|
||||||
|
Assert.assertEquals(topicMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
Assert.assertEquals(topicMetrics2.getTopicName(), REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertTrue(topicMetrics2.getMetricsMap().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicMetricsWithBrokerId2SuccessTest() {
|
||||||
|
TopicMetrics topicMetrics2 = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
|
||||||
|
, true);
|
||||||
|
Assert.assertNotNull(topicMetrics2);
|
||||||
|
Assert.assertEquals(topicMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
Assert.assertEquals(topicMetrics2.getTopicName(), REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertFalse(topicMetrics2.getMetricsMap().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTopicMetricsWithoutBrokerId() {
|
||||||
|
// 返回为空
|
||||||
|
getTopicMetricsWithoutBrokerId2Null();
|
||||||
|
// add
|
||||||
|
getTopicMetricsWithoutBrokerId2Add();
|
||||||
|
// max
|
||||||
|
getTopicMetricsWithoutBrokerId2Max();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicMetricsWithoutBrokerId2Null() {
|
||||||
|
TopicMetrics topicMetrics = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_TOPIC,
|
||||||
|
KafkaMetricsCollections.TOPIC_METRICS_TO_DB
|
||||||
|
, true);
|
||||||
|
Assert.assertNull(topicMetrics);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicMetricsWithoutBrokerId2Add() {
|
||||||
|
TopicMetrics topicMetrics = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
|
||||||
|
, true);
|
||||||
|
Assert.assertNotNull(topicMetrics);
|
||||||
|
Assert.assertNotNull(topicMetrics.getBrokerMetricsList());
|
||||||
|
Assert.assertNotNull(topicMetrics.getMetricsMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicMetricsWithoutBrokerId2Max() {
|
||||||
|
TopicMetrics topicMetrics = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC2_IN_ZK,
|
||||||
|
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
|
||||||
|
, false);
|
||||||
|
Assert.assertNotNull(topicMetrics);
|
||||||
|
Assert.assertNotNull(topicMetrics.getBrokerMetricsList());
|
||||||
|
Assert.assertNotNull(topicMetrics.getMetricsMap());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取集群下所有topic指标")
|
||||||
|
public void getTopicMetricsList() {
|
||||||
|
List<TopicMetrics> topicMetrics = jmxService.getTopicMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB
|
||||||
|
, false);
|
||||||
|
Assert.assertFalse(topicMetrics.isEmpty());
|
||||||
|
Assert.assertTrue(topicMetrics.stream().allMatch(topicMetric ->
|
||||||
|
topicMetric.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取broker版本")
|
||||||
|
public void getBrokerVersion() {
|
||||||
|
// 结果为空
|
||||||
|
getBrokerVersion2Empty();
|
||||||
|
// 结果不为空
|
||||||
|
getBrokerVersion2NotEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerVersion2Empty() {
|
||||||
|
String brokerVersion = jmxService.getBrokerVersion(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_BROKER_ID);
|
||||||
|
Assert.assertEquals(brokerVersion, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerVersion2NotEmpty() {
|
||||||
|
String brokerVersion = jmxService.getBrokerVersion(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertNotEquals(brokerVersion, "");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "获取客户端限流信息")
|
||||||
|
public void getTopicAppThrottleTest() {
|
||||||
|
// 结果为0
|
||||||
|
getTopicAppThrottle2ZeroTest();
|
||||||
|
// 结果不为0
|
||||||
|
getTopicAppThrottle2NotZeroTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicAppThrottle2ZeroTest() {
|
||||||
|
double topicAppThrottle = jmxService.getTopicAppThrottle(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_BROKER_ID,
|
||||||
|
"1",
|
||||||
|
KafkaClientEnum.FETCH_CLIENT);
|
||||||
|
Assert.assertEquals(topicAppThrottle, 0.0d);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicAppThrottle2NotZeroTest() {
|
||||||
|
double topicAppThrottle = jmxService.getTopicAppThrottle(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK,
|
||||||
|
CLIENT_ID,
|
||||||
|
KafkaClientEnum.FETCH_CLIENT);
|
||||||
|
// 未设置限流,所以还是为0
|
||||||
|
Assert.assertEquals(topicAppThrottle, 0.0d);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "获取被限流信息")
|
||||||
|
public void getBrokerThrottleClientsTest() {
|
||||||
|
// 结果为空
|
||||||
|
getBrokerThrottleClients2EmptyTest();
|
||||||
|
// 构造限流client,返回结果不为空
|
||||||
|
getBrokerThrottleClients2NotEmptyTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerThrottleClients2EmptyTest() {
|
||||||
|
Set<String> brokerThrottleClients = jmxService.getBrokerThrottleClients(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_BROKER_ID,
|
||||||
|
KafkaClientEnum.FETCH_CLIENT);
|
||||||
|
Assert.assertTrue(brokerThrottleClients.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerThrottleClients2NotEmptyTest() {
|
||||||
|
Set<String> brokerThrottleClients = jmxService.getBrokerThrottleClients(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_BROKER_ID_IN_ZK,
|
||||||
|
KafkaClientEnum.FETCH_CLIENT);
|
||||||
|
Assert.assertFalse(brokerThrottleClients.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取topic消息压缩指标")
|
||||||
|
public void getTopicCodeCValueTest() {
|
||||||
|
// 结果为null
|
||||||
|
getTopicCodeCValue2NullTest();
|
||||||
|
// 结果不为null
|
||||||
|
getTopicCodeCValue2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicCodeCValue2NullTest() {
|
||||||
|
String result = jmxService.getTopicCodeCValue(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC);
|
||||||
|
Assert.assertNull(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicCodeCValue2SuccessTest() {
|
||||||
|
String result = jmxService.getTopicCodeCValue(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC2_IN_ZK);
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试从JMX中获取appId维度的的流量信息")
|
||||||
|
public void getTopicAppMetricsTest() {
|
||||||
|
// result is empty
|
||||||
|
getTopicAppMetrics2Empty();
|
||||||
|
// result is not empty
|
||||||
|
getTopicAppMetrics2NotEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicAppMetrics2Empty() {
|
||||||
|
List<TopicMetrics> topicAppMetrics = jmxService.getTopicAppMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_METRICS_CODE);
|
||||||
|
Assert.assertTrue(topicAppMetrics.isEmpty());
|
||||||
|
|
||||||
|
List<TopicMetrics> topicAppMetrics2 = jmxService.getTopicAppMetrics(
|
||||||
|
INVALID_CLUSTER_ID,
|
||||||
|
KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB);
|
||||||
|
Assert.assertTrue(topicAppMetrics2.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicAppMetrics2NotEmpty() {
|
||||||
|
List<TopicMetrics> topicAppMetrics = jmxService.getTopicAppMetrics(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB
|
||||||
|
);
|
||||||
|
Assert.assertFalse(topicAppMetrics.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getBrokerTopicLocationTest() {
|
||||||
|
// result is empty
|
||||||
|
getBrokerTopicLocation2EmptyTest();
|
||||||
|
// result is not empty
|
||||||
|
getBrokerTopicLocation2NotEmptyTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerTopicLocation2EmptyTest() {
|
||||||
|
Map<TopicPartition, String> brokerTopicLocation = jmxService.getBrokerTopicLocation(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
INVALID_BROKER_ID
|
||||||
|
);
|
||||||
|
Assert.assertTrue(brokerTopicLocation.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getBrokerTopicLocation2NotEmptyTest() {
|
||||||
|
Map<TopicPartition, String> brokerTopicLocation = jmxService.getBrokerTopicLocation(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
2
|
||||||
|
);
|
||||||
|
Assert.assertFalse(brokerTopicLocation.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getPartitionAttributeTest() {
|
||||||
|
// result is empty
|
||||||
|
getPartitionAttribute2EmptyTest();
|
||||||
|
// result is not empty
|
||||||
|
getPartitionAttribute2NotEmptyTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getPartitionAttribute2EmptyTest() {
|
||||||
|
Map<Integer, PartitionAttributeDTO> list = jmxService.getPartitionAttribute(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC2_IN_ZK,
|
||||||
|
Collections.emptyList());
|
||||||
|
Assert.assertTrue(list.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getPartitionAttribute2NotEmptyTest() {
|
||||||
|
// 需要确定leader所在broker
|
||||||
|
PartitionState partitionState1 = getPartitionState();
|
||||||
|
PartitionState partitionState2 = getPartitionState();
|
||||||
|
partitionState2.setLeader(3);
|
||||||
|
partitionState2.setPartitionId(1);
|
||||||
|
|
||||||
|
Map<Integer, PartitionAttributeDTO> list = jmxService.getPartitionAttribute(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC2_IN_ZK,
|
||||||
|
Arrays.asList(partitionState1, partitionState1, partitionState2)
|
||||||
|
);
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -31,6 +31,10 @@ import java.util.*;
|
|||||||
*/
|
*/
|
||||||
public class LogicalClusterServiceTest extends BaseTest {
|
public class LogicalClusterServiceTest extends BaseTest {
|
||||||
|
|
||||||
|
private final static Long INVALID_CLUSTER_ID = -1L;
|
||||||
|
|
||||||
|
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
|
||||||
|
|
||||||
@Autowired
|
@Autowired
|
||||||
@InjectMocks
|
@InjectMocks
|
||||||
private LogicalClusterService logicalClusterService;
|
private LogicalClusterService logicalClusterService;
|
||||||
@@ -52,8 +56,8 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
@DataProvider(name = "provideLogicalClusterDO")
|
@DataProvider(name = "provideLogicalClusterDO")
|
||||||
public Object[][] provideLogicalClusterDO() {
|
public Object[][] provideLogicalClusterDO() {
|
||||||
LogicalClusterDO logicalClusterDO = new LogicalClusterDO();
|
LogicalClusterDO logicalClusterDO = new LogicalClusterDO();
|
||||||
logicalClusterDO.setId(100L);
|
logicalClusterDO.setId(INVALID_CLUSTER_ID);
|
||||||
logicalClusterDO.setClusterId(1L);
|
logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
logicalClusterDO.setIdentification("moduleTestLogicalCluster");
|
logicalClusterDO.setIdentification("moduleTestLogicalCluster");
|
||||||
logicalClusterDO.setName("moduleTestLogicalCluster");
|
logicalClusterDO.setName("moduleTestLogicalCluster");
|
||||||
logicalClusterDO.setMode(1);
|
logicalClusterDO.setMode(1);
|
||||||
@@ -66,8 +70,8 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
|
|
||||||
private LogicalClusterDO getLogicalClusterDO() {
|
private LogicalClusterDO getLogicalClusterDO() {
|
||||||
LogicalClusterDO logicalClusterDO = new LogicalClusterDO();
|
LogicalClusterDO logicalClusterDO = new LogicalClusterDO();
|
||||||
logicalClusterDO.setId(100L);
|
logicalClusterDO.setId(INVALID_CLUSTER_ID);
|
||||||
logicalClusterDO.setClusterId(1L);
|
logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
logicalClusterDO.setIdentification("moduleTestLogicalCluster");
|
logicalClusterDO.setIdentification("moduleTestLogicalCluster");
|
||||||
logicalClusterDO.setName("moduleTestLogicalCluster");
|
logicalClusterDO.setName("moduleTestLogicalCluster");
|
||||||
logicalClusterDO.setMode(0);
|
logicalClusterDO.setMode(0);
|
||||||
@@ -120,7 +124,7 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
Assert.assertEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
Assert.assertEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
||||||
|
|
||||||
// regionList为空情况
|
// regionList为空情况
|
||||||
logicalClusterDO.setClusterId(1L);
|
logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
logicalClusterDO.setRegionList("");
|
logicalClusterDO.setRegionList("");
|
||||||
ResultStatus result2 = logicalClusterService.createLogicalCluster(logicalClusterDO);
|
ResultStatus result2 = logicalClusterService.createLogicalCluster(logicalClusterDO);
|
||||||
Assert.assertEquals(result2.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
Assert.assertEquals(result2.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
||||||
@@ -135,7 +139,7 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
LogicalClusterDO logicalClusterDO = getLogicalClusterDO();
|
LogicalClusterDO logicalClusterDO = getLogicalClusterDO();
|
||||||
Mockito.when(logicalClusterDao.insert(Mockito.any())).thenReturn(1);
|
Mockito.when(logicalClusterDao.insert(Mockito.any())).thenReturn(1);
|
||||||
// 不存在该物理集群情况
|
// 不存在该物理集群情况
|
||||||
logicalClusterDO.setClusterId(100L);
|
logicalClusterDO.setClusterId(INVALID_CLUSTER_ID);
|
||||||
ResultStatus result1 = logicalClusterService.createLogicalCluster(logicalClusterDO);
|
ResultStatus result1 = logicalClusterService.createLogicalCluster(logicalClusterDO);
|
||||||
Assert.assertNotEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
Assert.assertNotEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
||||||
Assert.assertEquals(result1.getCode(), ResultStatus.SUCCESS.getCode());
|
Assert.assertEquals(result1.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
@@ -205,7 +209,7 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
private void deleteById2ResourceNotExistTest() {
|
private void deleteById2ResourceNotExistTest() {
|
||||||
Mockito.when(logicalClusterDao.deleteById(Mockito.anyLong())).thenReturn(-1);
|
Mockito.when(logicalClusterDao.deleteById(Mockito.anyLong())).thenReturn(-1);
|
||||||
|
|
||||||
ResultStatus resultStatus = logicalClusterService.deleteById(100L);
|
ResultStatus resultStatus = logicalClusterService.deleteById(INVALID_CLUSTER_ID);
|
||||||
Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode());
|
Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode());
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -235,7 +239,7 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
|
|
||||||
@Test(dataProvider = "provideLogicalClusterDO", description = "修改集群时无对应逻辑集群")
|
@Test(dataProvider = "provideLogicalClusterDO", description = "修改集群时无对应逻辑集群")
|
||||||
public void updateById2ResourceNotExistTest(LogicalClusterDO logicalClusterDO) {
|
public void updateById2ResourceNotExistTest(LogicalClusterDO logicalClusterDO) {
|
||||||
logicalClusterDO.setId(100L);
|
logicalClusterDO.setId(INVALID_CLUSTER_ID);
|
||||||
ResultStatus resultStatus2 = logicalClusterService.updateById(logicalClusterDO);
|
ResultStatus resultStatus2 = logicalClusterService.updateById(logicalClusterDO);
|
||||||
Assert.assertEquals(resultStatus2.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode());
|
Assert.assertEquals(resultStatus2.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode());
|
||||||
}
|
}
|
||||||
@@ -250,7 +254,7 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
Assert.assertEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
Assert.assertEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
||||||
|
|
||||||
// regionList为空情况
|
// regionList为空情况
|
||||||
logicalClusterDO.setClusterId(1L);
|
logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
logicalClusterDO.setRegionList("");
|
logicalClusterDO.setRegionList("");
|
||||||
ResultStatus result2 = logicalClusterService.updateById(logicalClusterDO);
|
ResultStatus result2 = logicalClusterService.updateById(logicalClusterDO);
|
||||||
Assert.assertEquals(result2.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
Assert.assertEquals(result2.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode());
|
||||||
@@ -315,7 +319,7 @@ public class LogicalClusterServiceTest extends BaseTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void getLogicalCluster2NullTest() {
|
private void getLogicalCluster2NullTest() {
|
||||||
LogicalCluster logicalCluster = logicalClusterService.getLogicalCluster(100L);
|
LogicalCluster logicalCluster = logicalClusterService.getLogicalCluster(INVALID_CLUSTER_ID);
|
||||||
Assert.assertNull(logicalCluster);
|
Assert.assertNull(logicalCluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,458 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.service.service;
|
||||||
|
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusReassignEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.TopicReassignActionEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.reassign.ReassignStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.reassign.ReassignExecDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.reassign.ReassignExecSubDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.op.reassign.ReassignTopicDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ReassignTaskDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.dao.ReassignTaskDao;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
|
||||||
|
import kafka.common.TopicAndPartition;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.BeforeMethod;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xuguang
|
||||||
|
* @Date 2021/12/14
|
||||||
|
*/
|
||||||
|
public class ReassignServiceTest extends BaseTest {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC2_IN_ZK = "xgTest";
|
||||||
|
|
||||||
|
private final static String ADMIN_OPERATOR = "admin";
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@InjectMocks
|
||||||
|
private ReassignService reassignService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private RegionService regionService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private ClusterService clusterService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private ReassignTaskDao reassignTaskDao;
|
||||||
|
|
||||||
|
@BeforeMethod
|
||||||
|
public void setup() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
private final static String ZOOKEEPER_ADDRESS = "10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg";
|
||||||
|
|
||||||
|
private final static String REASSIGNMENTJSON =
|
||||||
|
"{ \"version\": 1, \"partitions\": [ { \"topic\": \"reassignTest\", \"partition\": 1, \"replicas\": [ 1,2,3 ], \"log_dirs\": [ \"any\",\"any\",\"any\" ] }, { \"topic\": \"reassignTest\", \"partition\": 0, \"replicas\": [ 1,2,3 ], \"log_dirs\": [ \"any\",\"any\",\"any\" ] } ] }";
|
||||||
|
|
||||||
|
private ReassignTopicDTO getReassignTopicDTO() {
|
||||||
|
ReassignTopicDTO reassignTopicDTO = new ReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
reassignTopicDTO.setBrokerIdList(Arrays.asList(2,3));
|
||||||
|
reassignTopicDTO.setRegionId(2L);
|
||||||
|
reassignTopicDTO.setPartitionIdList(Arrays.asList(0, 1));
|
||||||
|
reassignTopicDTO.setThrottle(100000L);
|
||||||
|
reassignTopicDTO.setMaxThrottle(100000L);
|
||||||
|
reassignTopicDTO.setMinThrottle(100000L);
|
||||||
|
reassignTopicDTO.setOriginalRetentionTime(10000L);
|
||||||
|
reassignTopicDTO.setReassignRetentionTime(10000L);
|
||||||
|
reassignTopicDTO.setBeginTime(100000L);
|
||||||
|
reassignTopicDTO.setDescription("");
|
||||||
|
return reassignTopicDTO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReassignExecDTO getReassignExecDTO() {
|
||||||
|
ReassignExecDTO reassignExecDTO = new ReassignExecDTO();
|
||||||
|
reassignExecDTO.setTaskId(1L);
|
||||||
|
reassignExecDTO.setAction("modify");
|
||||||
|
reassignExecDTO.setBeginTime(0L);
|
||||||
|
return reassignExecDTO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReassignTaskDO getReassignTaskDO() {
|
||||||
|
ReassignTaskDO reassignTaskDO = new ReassignTaskDO();
|
||||||
|
reassignTaskDO.setId(1L);
|
||||||
|
reassignTaskDO.setClusterId(1L);
|
||||||
|
reassignTaskDO.setStatus(0);
|
||||||
|
reassignTaskDO.setTaskId(1L);
|
||||||
|
reassignTaskDO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
reassignTaskDO.setPartitions("0,1,2");
|
||||||
|
reassignTaskDO.setReassignmentJson("");
|
||||||
|
reassignTaskDO.setRealThrottle(1000L);
|
||||||
|
reassignTaskDO.setMaxThrottle(1000L);
|
||||||
|
reassignTaskDO.setMinThrottle(1000L);
|
||||||
|
reassignTaskDO.setBeginTime(new Date());
|
||||||
|
reassignTaskDO.setSrcBrokers("0");
|
||||||
|
reassignTaskDO.setDestBrokers("1");
|
||||||
|
reassignTaskDO.setReassignRetentionTime(1000L);
|
||||||
|
reassignTaskDO.setOriginalRetentionTime(1000L);
|
||||||
|
reassignTaskDO.setDescription("测试迁移任务");
|
||||||
|
reassignTaskDO.setOperator(ADMIN_OPERATOR);
|
||||||
|
return reassignTaskDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ReassignExecSubDTO getReassignExecSubDTO() {
|
||||||
|
ReassignExecSubDTO reassignExecSubDTO = new ReassignExecSubDTO();
|
||||||
|
reassignExecSubDTO.setSubTaskId(1L);
|
||||||
|
reassignExecSubDTO.setAction("modify");
|
||||||
|
reassignExecSubDTO.setThrottle(100000L);
|
||||||
|
reassignExecSubDTO.setMaxThrottle(100000L);
|
||||||
|
reassignExecSubDTO.setMinThrottle(100000L);
|
||||||
|
return reassignExecSubDTO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ClusterDO getClusterDO() {
|
||||||
|
ClusterDO clusterDO = new ClusterDO();
|
||||||
|
clusterDO.setId(1L);
|
||||||
|
clusterDO.setClusterName("LogiKM_moduleTest");
|
||||||
|
clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg");
|
||||||
|
clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093");
|
||||||
|
clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }");
|
||||||
|
clusterDO.setStatus(1);
|
||||||
|
clusterDO.setGmtCreate(new Date());
|
||||||
|
clusterDO.setGmtModify(new Date());
|
||||||
|
return clusterDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "创建迁移任务")
|
||||||
|
public void createTaskTest() {
|
||||||
|
// 参数错误
|
||||||
|
createTask2paramIllegalTest();
|
||||||
|
// 物理集群不存在
|
||||||
|
createTask2ClusterNotExistTest();
|
||||||
|
// topic不存在
|
||||||
|
createTask2TopicNotExistTest();
|
||||||
|
// broker数量不足
|
||||||
|
createTask2BrokerNumNotEnoughTest();
|
||||||
|
// broker不存在
|
||||||
|
createTask2BrokerNotExistTest();
|
||||||
|
// broker数量不足, checkParamLegal()方法中
|
||||||
|
createTask2BrokerNumNotEnough2Test();
|
||||||
|
// 参数错误, checkParamLegal()方法中
|
||||||
|
createTask2ParamIllegal2Test();
|
||||||
|
// 分区为空
|
||||||
|
createTask2PartitionIdListEmptyTest();
|
||||||
|
// 分区不存在
|
||||||
|
createTask2PartitionNotExistTest();
|
||||||
|
// 创建任务成功
|
||||||
|
createTask2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2paramIllegalTest() {
|
||||||
|
ResultStatus result = reassignService.createTask(Collections.emptyList(), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.PARAM_ILLEGAL.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2ClusterNotExistTest() {
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(-1L);
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.CLUSTER_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2TopicNotExistTest() {
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName("xxx");
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2BrokerNumNotEnoughTest() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(null);
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NUM_NOT_ENOUGH.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2BrokerNotExistTest() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(100, 2, 3));
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2BrokerNumNotEnough2Test() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(2, 3));
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NUM_NOT_ENOUGH.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2ParamIllegal2Test() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3));
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.PARAM_ILLEGAL.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2PartitionIdListEmptyTest() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3));
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
reassignTopicDTO.setOriginalRetentionTime(168 * 3600000L);
|
||||||
|
reassignTopicDTO.setPartitionIdList(Collections.emptyList());
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.PARAM_ILLEGAL.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2PartitionNotExistTest() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3));
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setClusterId(1L);
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
reassignTopicDTO.setOriginalRetentionTime(168 * 3600000L);
|
||||||
|
reassignTopicDTO.setPartitionIdList(Arrays.asList(100, 0));
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.PARTITION_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTask2SuccessTest() {
|
||||||
|
Mockito.when(regionService.getFullBrokerIdList(
|
||||||
|
Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3));
|
||||||
|
|
||||||
|
ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO();
|
||||||
|
reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK);
|
||||||
|
reassignTopicDTO.setOriginalRetentionTime(168 * 3600000L);
|
||||||
|
ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取迁移任务")
|
||||||
|
public void getTaskTest() {
|
||||||
|
// 测试获取成功
|
||||||
|
getTaskTest2Success();
|
||||||
|
// 测试获取失败
|
||||||
|
getTaskTest2Exception();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTaskTest2Success() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
|
||||||
|
List<ReassignTaskDO> task = reassignService.getTask(reassignTask.getTaskId());
|
||||||
|
Assert.assertFalse(task.isEmpty());
|
||||||
|
Assert.assertTrue(task.stream().allMatch(reassignTaskDO ->
|
||||||
|
reassignTaskDO.getTaskId().equals(reassignTask.getTaskId()) &&
|
||||||
|
reassignTaskDO.getClusterId().equals(reassignTask.getClusterId()) &&
|
||||||
|
reassignTaskDO.getStatus().equals(reassignTask.getStatus())));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTaskTest2Exception() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenThrow(RuntimeException.class);
|
||||||
|
|
||||||
|
List<ReassignTaskDO> task = reassignService.getTask(reassignTask.getTaskId());
|
||||||
|
Assert.assertNull(task);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "修改迁移任务")
|
||||||
|
public void modifyTask() {
|
||||||
|
// operation forbidden
|
||||||
|
modifyTask2OperationForbiddenTest();
|
||||||
|
// 修改成功
|
||||||
|
modifyTask2Success();
|
||||||
|
// mysqlError
|
||||||
|
modifyTask2MysqlError();
|
||||||
|
// 任务不存在
|
||||||
|
modifyTask2TaskNotExistTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifyTask2TaskNotExistTest() {
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenThrow(RuntimeException.class);
|
||||||
|
|
||||||
|
ReassignExecDTO reassignExecDTO = getReassignExecDTO();
|
||||||
|
reassignExecDTO.setTaskId(100L);
|
||||||
|
ResultStatus resultStatus = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START);
|
||||||
|
Assert.assertEquals(resultStatus.getCode(), ResultStatus.TASK_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifyTask2OperationForbiddenTest() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
reassignTask.setStatus(1);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
|
||||||
|
ReassignExecDTO reassignExecDTO = getReassignExecDTO();
|
||||||
|
ResultStatus resultStatus1 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START);
|
||||||
|
Assert.assertEquals(resultStatus1.getCode(), ResultStatus.OPERATION_FORBIDDEN.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifyTask2Success() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
reassignTask.setStatus(0);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
|
||||||
|
ReassignExecDTO reassignExecDTO = getReassignExecDTO();
|
||||||
|
// cancel action
|
||||||
|
ResultStatus resultStatus1 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.CANCEL);
|
||||||
|
Assert.assertEquals(resultStatus1.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
|
|
||||||
|
// start action
|
||||||
|
reassignTask.setStatus(0);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
ResultStatus resultStatus2 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START);
|
||||||
|
Assert.assertEquals(resultStatus2.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
|
|
||||||
|
// modify action
|
||||||
|
reassignTask.setStatus(0);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
ResultStatus resultStatus3 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.MODIFY);
|
||||||
|
Assert.assertEquals(resultStatus3.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifyTask2MysqlError() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
reassignTask.setStatus(0);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
|
||||||
|
ReassignExecDTO reassignExecDTO = getReassignExecDTO();
|
||||||
|
// cancel action
|
||||||
|
Mockito.doThrow(RuntimeException.class).when(reassignTaskDao).batchUpdate(Mockito.anyList());
|
||||||
|
ResultStatus resultStatus1 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.CANCEL);
|
||||||
|
Assert.assertEquals(resultStatus1.getCode(), ResultStatus.MYSQL_ERROR.getCode());
|
||||||
|
|
||||||
|
// start action
|
||||||
|
reassignTask.setStatus(0);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
Mockito.doThrow(RuntimeException.class).when(reassignTaskDao).batchUpdate(Mockito.anyList());
|
||||||
|
ResultStatus resultStatus2 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START);
|
||||||
|
Assert.assertEquals(resultStatus2.getCode(), ResultStatus.MYSQL_ERROR.getCode());
|
||||||
|
|
||||||
|
// modify action
|
||||||
|
reassignTask.setStatus(0);
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask));
|
||||||
|
Mockito.doThrow(RuntimeException.class).when(reassignTaskDao).batchUpdate(Mockito.anyList());
|
||||||
|
ResultStatus resultStatus3 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.MODIFY);
|
||||||
|
Assert.assertEquals(resultStatus3.getCode(), ResultStatus.MYSQL_ERROR.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "修改子任务测试")
|
||||||
|
public void modifySubTaskTest() {
|
||||||
|
// 任务不存在
|
||||||
|
modifySubTask2TaskNotExist();
|
||||||
|
// 修改任务成功
|
||||||
|
modifySubTask2Success();
|
||||||
|
// 修改任务失败
|
||||||
|
modifySubTask2MysqlError();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifySubTask2TaskNotExist() {
|
||||||
|
Mockito.when(reassignTaskDao.getSubTask(Mockito.anyLong())).thenReturn(null);
|
||||||
|
ResultStatus resultStatus = reassignService.modifySubTask(new ReassignExecSubDTO());
|
||||||
|
Assert.assertEquals(resultStatus.getCode(), ResultStatus.TASK_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifySubTask2Success() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
Mockito.when(reassignTaskDao.getSubTask(Mockito.anyLong())).thenReturn(reassignTask);
|
||||||
|
ReassignExecSubDTO reassignExecSubDTO = getReassignExecSubDTO();
|
||||||
|
ResultStatus resultStatus = reassignService.modifySubTask(reassignExecSubDTO);
|
||||||
|
Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void modifySubTask2MysqlError() {
|
||||||
|
ReassignTaskDO reassignTask = getReassignTaskDO();
|
||||||
|
Mockito.when(reassignTaskDao.getSubTask(Mockito.anyLong())).thenReturn(reassignTask);
|
||||||
|
Mockito.when(reassignTaskDao.updateById(Mockito.any())).thenThrow(RuntimeException.class);
|
||||||
|
ReassignExecSubDTO reassignExecSubDTO = getReassignExecSubDTO();
|
||||||
|
ResultStatus resultStatus = reassignService.modifySubTask(reassignExecSubDTO);
|
||||||
|
Assert.assertEquals(resultStatus.getCode(), ResultStatus.MYSQL_ERROR.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test()
|
||||||
|
public void getReassignTaskListTest() {
|
||||||
|
// 获取成功
|
||||||
|
getReassignTaskList2Success();
|
||||||
|
// 获取失败
|
||||||
|
getReassignTaskList2Empty();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getReassignTaskList2Success() {
|
||||||
|
Mockito.when(reassignTaskDao.listAll()).thenReturn(Arrays.asList(new ReassignTaskDO()));
|
||||||
|
List<ReassignTaskDO> reassignTaskList = reassignService.getReassignTaskList();
|
||||||
|
Assert.assertFalse(reassignTaskList.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getReassignTaskList2Empty() {
|
||||||
|
Mockito.when(reassignTaskDao.listAll()).thenThrow(RuntimeException.class);
|
||||||
|
List<ReassignTaskDO> reassignTaskList = reassignService.getReassignTaskList();
|
||||||
|
Assert.assertTrue(reassignTaskList.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getReassignStatusTest() {
|
||||||
|
// 获取成功
|
||||||
|
getReassignStatus2Success();
|
||||||
|
// task不存在
|
||||||
|
getReassignStatus2TaskNotExistTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getReassignStatus2TaskNotExistTest() {
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenThrow(RuntimeException.class);
|
||||||
|
Result<List<ReassignStatus>> reassignStatus = reassignService.getReassignStatus(1L);
|
||||||
|
Assert.assertEquals(reassignStatus.getCode(), ResultStatus.TASK_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getReassignStatus2Success() {
|
||||||
|
ClusterDO clusterDO1 = getClusterDO();
|
||||||
|
ClusterDO clusterDO2 = getClusterDO();
|
||||||
|
clusterDO2.setId(100L);
|
||||||
|
Map<Long, ClusterDO> map = new HashMap<>();
|
||||||
|
map.put(clusterDO1.getId(), clusterDO1);
|
||||||
|
map.put(clusterDO2.getId(), clusterDO2);
|
||||||
|
Mockito.when(clusterService.listMap()).thenReturn(map);
|
||||||
|
|
||||||
|
ReassignTaskDO reassignTaskDO1 = getReassignTaskDO();
|
||||||
|
ReassignTaskDO reassignTaskDO2 = getReassignTaskDO();
|
||||||
|
reassignTaskDO2.setStatus(TaskStatusReassignEnum.RUNNING.getCode());
|
||||||
|
|
||||||
|
Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTaskDO1, reassignTaskDO2));
|
||||||
|
Result<List<ReassignStatus>> reassignStatus = reassignService.getReassignStatus(1L);
|
||||||
|
Assert.assertFalse(reassignStatus.getData().isEmpty());
|
||||||
|
Assert.assertEquals(reassignStatus.getCode(), ResultStatus.SUCCESS.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void verifyAssignmenTest() {
|
||||||
|
Map<TopicAndPartition, TaskStatusReassignEnum> map = reassignService.verifyAssignment(ZOOKEEPER_ADDRESS, REASSIGNMENTJSON);
|
||||||
|
Assert.assertFalse(map.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,741 @@
|
|||||||
|
package com.xiaojukeji.kafka.manager.service.service;
|
||||||
|
|
||||||
|
import com.alibaba.fastjson.JSON;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.constant.TopicSampleConstant;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO;
|
||||||
|
import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState;
|
||||||
|
import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.config.BaseTest;
|
||||||
|
import com.xiaojukeji.kafka.manager.service.service.gateway.AppService;
|
||||||
|
import org.apache.kafka.common.TopicPartition;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.MockitoAnnotations;
|
||||||
|
import org.springframework.beans.factory.annotation.Autowired;
|
||||||
|
import org.testng.Assert;
|
||||||
|
import org.testng.annotations.BeforeMethod;
|
||||||
|
import org.testng.annotations.Test;
|
||||||
|
|
||||||
|
import java.util.*;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @author xuguang
|
||||||
|
* @Date 2021/12/20
|
||||||
|
*/
|
||||||
|
public class TopicServiceTest extends BaseTest {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC1_IN_ZK = "moduleTest";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上
|
||||||
|
*/
|
||||||
|
private final static String REAL_TOPIC2_IN_ZK = "xgTest";
|
||||||
|
|
||||||
|
private final static String INVALID_TOPIC = "xxxxx";
|
||||||
|
|
||||||
|
private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets";
|
||||||
|
|
||||||
|
private final static String NO_OFFSET_CHANGE_TOPIC_IN_ZK = "NoOffsetChangeTopic";
|
||||||
|
|
||||||
|
private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L;
|
||||||
|
|
||||||
|
private final static Integer REAL_BROKER_ID_IN_ZK = 3;
|
||||||
|
|
||||||
|
private final static Long INVALID_CLUSTER_ID = -1L;
|
||||||
|
|
||||||
|
private final static Integer INVALID_PARTITION_ID = -1;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
@InjectMocks
|
||||||
|
private TopicService topicService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private TopicManagerService topicManagerService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private AppService appService;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private JmxService jmxService;
|
||||||
|
|
||||||
|
@Autowired
|
||||||
|
private TopicRequestMetricsDao topicRequestMetricsDao;
|
||||||
|
|
||||||
|
@BeforeMethod
|
||||||
|
public void setup() {
|
||||||
|
MockitoAnnotations.initMocks(this);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TopicMetricsDO getTopicMetricsDO() {
|
||||||
|
TopicMetricsDO topicMetricsDO = new TopicMetricsDO();
|
||||||
|
topicMetricsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
topicMetricsDO.setAppId("moduleTestAppId");
|
||||||
|
topicMetricsDO.setTopicName(REAL_TOPIC1_IN_ZK);
|
||||||
|
topicMetricsDO.setMetrics("");
|
||||||
|
topicMetricsDO.setGmtCreate(new Date());
|
||||||
|
return topicMetricsDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TopicDO getTopicDO() {
|
||||||
|
TopicDO topicDO = new TopicDO();
|
||||||
|
topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
topicDO.setTopicName(REAL_TOPIC1_IN_ZK);
|
||||||
|
topicDO.setAppId("moduleTestAppId");
|
||||||
|
topicDO.setDescription(INVALID_TOPIC);
|
||||||
|
topicDO.setPeakBytesIn(100000L);
|
||||||
|
return topicDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private AppDO getAppDO() {
|
||||||
|
AppDO appDO = new AppDO();
|
||||||
|
appDO.setId(4L);
|
||||||
|
appDO.setAppId("moduleTestAppId");
|
||||||
|
appDO.setName("moduleTestApp");
|
||||||
|
appDO.setPassword("moduleTestApp");
|
||||||
|
appDO.setType(1);
|
||||||
|
appDO.setApplicant("admin");
|
||||||
|
appDO.setPrincipals("admin");
|
||||||
|
appDO.setDescription("moduleTestApp");
|
||||||
|
appDO.setCreateTime(new Date(1638786493173L));
|
||||||
|
appDO.setModifyTime(new Date(1638786493173L));
|
||||||
|
return appDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ClusterDO getClusterDO() {
|
||||||
|
ClusterDO clusterDO = new ClusterDO();
|
||||||
|
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
clusterDO.setClusterName("LogiKM_moduleTest");
|
||||||
|
clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg");
|
||||||
|
clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093");
|
||||||
|
clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }");
|
||||||
|
clusterDO.setStatus(1);
|
||||||
|
clusterDO.setGmtCreate(new Date());
|
||||||
|
clusterDO.setGmtModify(new Date());
|
||||||
|
return clusterDO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TopicDataSampleDTO getTopicDataSampleDTO() {
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = new TopicDataSampleDTO();
|
||||||
|
topicDataSampleDTO.setPartitionId(0);
|
||||||
|
topicDataSampleDTO.setOffset(0L);
|
||||||
|
topicDataSampleDTO.setTimeout(5000);
|
||||||
|
topicDataSampleDTO.setTruncate(true);
|
||||||
|
topicDataSampleDTO.setMaxMsgNum(90);
|
||||||
|
return topicDataSampleDTO;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TopicMetrics getTopicMetrics() {
|
||||||
|
TopicMetrics topicMetrics = new TopicMetrics(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK);
|
||||||
|
String metrics = "{\"TotalFetchRequestsPerSecFiveMinuteRate\":4.132236103122026,\"BytesRejectedPerSecFiveMinuteRate\":0.0,\"TotalFetchRequestsPerSecFifteenMinuteRate\":1.5799208507558833,\"ProduceTotalTimeMs98thPercentile\":0.0,\"MessagesInPerSecMeanRate\":0.0,\"ProduceTotalTimeMs75thPercentile\":0.0,\"ProduceTotalTimeMs99thPercentile\":0.0,\"TotalProduceRequestsPerSecOneMinuteRate\":0.0,\"FailedProduceRequestsPerSecFifteenMinuteRate\":0.0,\"BytesInPerSecMeanRate\":0.0,\"TotalProduceRequestsPerSecFiveMinuteRate\":0.0,\"FetchConsumerTotalTimeMs999thPercentile\":0.0,\"FetchConsumerTotalTimeMs98thPercentile\":0.0,\"FetchConsumerTotalTimeMsMean\":0.0,\"FetchConsumerTotalTimeMs99thPercentile\":0.0,\"FailedFetchRequestsPerSecFifteenMinuteRate\":0.0,\"MessagesInPerSecFiveMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentOneMinuteRate\":0.999221766772746,\"ProduceTotalTimeMsMean\":0.0,\"BytesInPerSecFiveMinuteRate\":0.0,\"FailedProduceRequestsPerSecMeanRate\":0.0,\"FailedFetchRequestsPerSecMeanRate\":0.0,\"FailedProduceRequestsPerSecFiveMinuteRate\":0.0,\"BytesOutPerSecFifteenMinuteRate\":0.0,\"BytesInPerSecOneMinuteRate\":0.0,\"BytesOutPerSecFiveMinuteRate\":0.0,\"HealthScore\":90,\"FailedFetchRequestsPerSecOneMinuteRate\":0.0,\"MessagesInPerSecOneMinuteRate\":0.0,\"BytesRejectedPerSecFifteenMinuteRate\":0.0,\"FailedFetchRequestsPerSecFiveMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentFiveMinuteRate\":0.999803118809842,\"BytesOutPerSecOneMinuteRate\":0.0,\"ResponseQueueSizeValue\":0,\"MessagesInPerSecFifteenMinuteRate\":0.0,\"TotalProduceRequestsPerSecMeanRate\":0.0,\"BytesRejectedPerSecMeanRate\":0.0,\"TotalFetchRequestsPerSecMeanRate\":1.2674449706628523,\"NetworkProcessorAvgIdlePercentValue\":1.0,\"TotalFetchRequestsPerSecOneMinuteRate\":10.457259856316893,\"BytesInPerSecFifteenMinuteRate\":0.0,\"BytesOutPerSecMeanRate\":0.0,\"TotalProduceRequestsPerSecFifteenMinuteRate\":0.0,\"FetchConsumerTotalTimeMs50thPercentile\":0.0,\"RequestHandlerAvgIdlePercentFifteenMinuteRate\":0.9999287809186348,\"FetchConsumerTotalTimeMs95thPercentile\":0.0,\"FailedProduceRequestsPerSecOneMinuteRate\":0.0,\"CreateTime\":1638792321071,\"FetchConsumerTotalTimeMs75thPercentile\":0.0,\"ProduceTotalTimeMs999thPercentile\":0.0,\"RequestQueueSizeValue\":0,\"ProduceTotalTimeMs50thPercentile\":0.0,\"BytesRejectedPerSecOneMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentMeanRate\":0.9999649184090593,\"ProduceTotalTimeMs95thPercentile\":0.0}";
|
||||||
|
JSONObject jsonObject = JSON.parseObject(metrics);
|
||||||
|
Map<String, Object> metricsMap = new HashMap<>();
|
||||||
|
for (Map.Entry<String, Object> stringObjectEntry : jsonObject.entrySet()) {
|
||||||
|
metricsMap.put(stringObjectEntry.getKey(), stringObjectEntry.getValue());
|
||||||
|
}
|
||||||
|
topicMetrics.setMetricsMap(metricsMap);
|
||||||
|
return topicMetrics;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试从DB获取监控数据")
|
||||||
|
public void getTopicMetricsFromDBTest() {
|
||||||
|
List<TopicMetricsDO> list = topicService.getTopicMetricsFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, new Date(0L), new Date());
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
Assert.assertTrue(list.stream().allMatch(topicMetricsDO ->
|
||||||
|
topicMetricsDO.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topicMetricsDO.getTopicName().equals(REAL_TOPIC1_IN_ZK)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTopicMetricsFromDBWithAppIdTest() {
|
||||||
|
List<TopicMetricsDTO> list = topicService.getTopicMetricsFromDB("1", REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, new Date(0L), new Date());
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取指定时间段内的峰值的均值流量")
|
||||||
|
public void getMaxAvgBytesInFromDBTest() {
|
||||||
|
// 为空
|
||||||
|
getMaxAvgBytesInFromDB2NullTest();
|
||||||
|
// 获取成功
|
||||||
|
getMaxAvgBytesInFromDB2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getMaxAvgBytesInFromDB2NullTest() {
|
||||||
|
Double result = topicService.getMaxAvgBytesInFromDB(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC, new Date(0L), new Date());
|
||||||
|
Assert.assertNull(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getMaxAvgBytesInFromDB2SuccessTest() {
|
||||||
|
Double result = topicService.getMaxAvgBytesInFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, new Date(0L), new Date());
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "获取brokerId下所有的Topic及其对应的PartitionId")
|
||||||
|
public void getTopicPartitionIdMapTest() {
|
||||||
|
Map<String, List<Integer>> topicPartitionIdMap = topicService.getTopicPartitionIdMap(REAL_CLUSTER_ID_IN_MYSQL, 1);
|
||||||
|
Assert.assertFalse(topicPartitionIdMap.isEmpty());
|
||||||
|
Assert.assertTrue(topicPartitionIdMap.containsKey(REAL_TOPIC1_IN_ZK));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取 Topic 的 basic-info 信息")
|
||||||
|
public void getTopicBasicDTOTest() {
|
||||||
|
// TopicMetadata is Null
|
||||||
|
getTopicBasicDTO2TopicMetadataIsNull();
|
||||||
|
// TopicDO is Null
|
||||||
|
getTopicBasicDTO2TopicMetadata2TopicDOIsNull();
|
||||||
|
// TopicDO is not Null
|
||||||
|
getTopicBasicDTO2TopicMetadata2TopicDOIsNotNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicBasicDTO2TopicMetadataIsNull() {
|
||||||
|
TopicBasicDTO result = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC);
|
||||||
|
Assert.assertEquals(result.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL));
|
||||||
|
Assert.assertEquals(result.getTopicName(), INVALID_TOPIC);
|
||||||
|
Assert.assertNull(result.getAppId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicBasicDTO2TopicMetadata2TopicDOIsNull() {
|
||||||
|
Mockito.when(topicManagerService.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(null);
|
||||||
|
TopicBasicDTO result = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
Assert.assertEquals(result.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL));
|
||||||
|
Assert.assertEquals(result.getTopicName(), REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertNull(result.getDescription());
|
||||||
|
Assert.assertNull(result.getAppId());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicBasicDTO2TopicMetadata2TopicDOIsNotNull() {
|
||||||
|
TopicDO topicDO = getTopicDO();
|
||||||
|
Mockito.when(topicManagerService.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO);
|
||||||
|
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(null);
|
||||||
|
TopicBasicDTO result = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
Assert.assertEquals(result.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL));
|
||||||
|
Assert.assertEquals(result.getTopicName(), REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertEquals(result.getDescription(), topicDO.getDescription());
|
||||||
|
// appId不存在
|
||||||
|
Assert.assertNull(result.getAppId());
|
||||||
|
// appId存在
|
||||||
|
topicDO.setAppId("moduleTestAppId");
|
||||||
|
Mockito.when(topicManagerService.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO);
|
||||||
|
Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(getAppDO());
|
||||||
|
TopicBasicDTO result2 = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertNotNull(result2);
|
||||||
|
Assert.assertEquals(result2.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL));
|
||||||
|
Assert.assertEquals(result2.getTopicName(), REAL_TOPIC1_IN_ZK);
|
||||||
|
Assert.assertEquals(result2.getDescription(), topicDO.getDescription());
|
||||||
|
Assert.assertEquals(result2.getAppId(), topicDO.getAppId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "获取Topic的PartitionState信息")
|
||||||
|
public void getTopicPartitionDTOTest() {
|
||||||
|
// result is emptyList
|
||||||
|
getTopicPartitionDTO2EmptyTest();
|
||||||
|
// needDetail is false
|
||||||
|
getTopicPartitionDTO2NeedDetailFalseTest();
|
||||||
|
// needDetail is true
|
||||||
|
getTopicPartitionDTO2NeedDetailTrueTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicPartitionDTO2EmptyTest() {
|
||||||
|
List<TopicPartitionDTO> list = topicService.getTopicPartitionDTO(null, null, true);
|
||||||
|
Assert.assertTrue(list.isEmpty());
|
||||||
|
|
||||||
|
ClusterDO clusterDO = new ClusterDO();
|
||||||
|
clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL);
|
||||||
|
List<TopicPartitionDTO> list2 = topicService.getTopicPartitionDTO(clusterDO, INVALID_TOPIC, true);
|
||||||
|
Assert.assertTrue(list2.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicPartitionDTO2NeedDetailFalseTest() {
|
||||||
|
Map<Integer, PartitionAttributeDTO> map = new HashMap<>();
|
||||||
|
PartitionAttributeDTO partitionAttributeDTO1 = new PartitionAttributeDTO();
|
||||||
|
partitionAttributeDTO1.setLogSize(0L);
|
||||||
|
map.put(0, partitionAttributeDTO1);
|
||||||
|
map.put(1, null);
|
||||||
|
Mockito.when(jmxService.getPartitionAttribute(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyList())).thenReturn(map);
|
||||||
|
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
List<TopicPartitionDTO> list = topicService.getTopicPartitionDTO(clusterDO, REAL_TOPIC1_IN_ZK, false);
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
Assert.assertEquals(list.size(), 2);
|
||||||
|
Assert.assertTrue(list.stream().allMatch(topicPartitionDTO ->
|
||||||
|
topicPartitionDTO.getBeginningOffset() == null &&
|
||||||
|
topicPartitionDTO.getEndOffset() == null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicPartitionDTO2NeedDetailTrueTest() {
|
||||||
|
Map<Integer, PartitionAttributeDTO> map = new HashMap<>();
|
||||||
|
PartitionAttributeDTO partitionAttributeDTO1 = new PartitionAttributeDTO();
|
||||||
|
partitionAttributeDTO1.setLogSize(0L);
|
||||||
|
map.put(0, partitionAttributeDTO1);
|
||||||
|
map.put(1, null);
|
||||||
|
Mockito.when(jmxService.getPartitionAttribute(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyList())).thenReturn(map);
|
||||||
|
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
List<TopicPartitionDTO> list = topicService.getTopicPartitionDTO(clusterDO, REAL_TOPIC1_IN_ZK, true);
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
Assert.assertEquals(list.size(), 2);
|
||||||
|
Assert.assertTrue(list.stream().allMatch(topicPartitionDTO ->
|
||||||
|
topicPartitionDTO.getBeginningOffset() != null &&
|
||||||
|
topicPartitionDTO.getEndOffset() != null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void getTopicMetricsFromJMXTest() {
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).thenReturn(new TopicMetrics(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK));
|
||||||
|
BaseMetrics result = topicService.getTopicMetricsFromJMX(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, 200, true);
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取Topic的分区的offset")
|
||||||
|
public void getPartitionOffsetTest() {
|
||||||
|
// 结果为空
|
||||||
|
getPartitionOffset2EmptyTest();
|
||||||
|
// 获取成功
|
||||||
|
getPartitionOffset2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getPartitionOffset2EmptyTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
Map<TopicPartition, Long> partitionOffset = topicService.getPartitionOffset(
|
||||||
|
null, null, OffsetPosEnum.BEGINNING);
|
||||||
|
Assert.assertTrue(partitionOffset.isEmpty());
|
||||||
|
|
||||||
|
Map<TopicPartition, Long> partitionOffset2 = topicService.getPartitionOffset(
|
||||||
|
clusterDO, INVALID_TOPIC, OffsetPosEnum.BEGINNING);
|
||||||
|
Assert.assertTrue(partitionOffset2.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getPartitionOffset2SuccessTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
// 获取beginning offset
|
||||||
|
Map<TopicPartition, Long> partitionOffset1 = topicService.getPartitionOffset(
|
||||||
|
clusterDO, REAL_TOPIC1_IN_ZK, OffsetPosEnum.BEGINNING);
|
||||||
|
Assert.assertFalse(partitionOffset1.isEmpty());
|
||||||
|
// 获取end offset
|
||||||
|
Map<TopicPartition, Long> partitionOffset2 = topicService.getPartitionOffset(
|
||||||
|
clusterDO, REAL_TOPIC1_IN_ZK, OffsetPosEnum.END);
|
||||||
|
Assert.assertFalse(partitionOffset2.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取Topic概览信息,参数clusterId, brokerId")
|
||||||
|
public void getTopicOverviewListTest() {
|
||||||
|
// 结果为空
|
||||||
|
getTopicOverviewList2EmptyTest();
|
||||||
|
// 获取成功
|
||||||
|
getTopicOverviewList2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewList2EmptyTest() {
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(null, 1);
|
||||||
|
Assert.assertTrue(topicOverviewList.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewList2SuccessTest() {
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, 1);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取Topic概览信息,参数clusterId, topicNameList")
|
||||||
|
public void getTopicOverviewListWithTopicList() {
|
||||||
|
// 结果为空
|
||||||
|
getTopicOverviewListWithTopicList2EmptyTest();
|
||||||
|
|
||||||
|
// topicDOList is null,appDOList is null, metrics is null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp1Test();
|
||||||
|
|
||||||
|
// topicDOList is null,appDOList is null, metrics is not null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp2Test();
|
||||||
|
|
||||||
|
// topicDOList is null,appDOList is not null, metrics is null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp3Test();
|
||||||
|
|
||||||
|
// topicDOList is null,appDOList is not null, metrics is not null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp4Test();
|
||||||
|
|
||||||
|
// topicDOList is not null,appDOList is null, metrics is null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp5Test();
|
||||||
|
|
||||||
|
// topicDOList is not null,appDOList is null, metrics is not null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp6Test();
|
||||||
|
|
||||||
|
// topicDOList is not null,appDOList is not null, metrics is null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp7Test();
|
||||||
|
|
||||||
|
// topicDOList is not null,appDOList is not null, metrics is not null
|
||||||
|
getTopicOverviewListWithTopicList2TopicAndApp8Test();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2EmptyTest() {
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(null, Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC));
|
||||||
|
Assert.assertTrue(topicOverviewList.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp1Test() {
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null);
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(null);
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).thenReturn(null);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId() == null &&
|
||||||
|
topicOverview.getAppName() == null &&
|
||||||
|
topicOverview.getByteIn() == null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp2Test() {
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null);
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(null);
|
||||||
|
TopicMetrics topicMetrics = getTopicMetrics();
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(topicMetrics);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId() == null &&
|
||||||
|
topicOverview.getAppName() == null &&
|
||||||
|
topicOverview.getByteIn() != null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp3Test() {
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null);
|
||||||
|
AppDO appDO = getAppDO();
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO));
|
||||||
|
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(null);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId() == null &&
|
||||||
|
topicOverview.getAppName() == null &&
|
||||||
|
topicOverview.getByteIn() == null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp4Test() {
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null);
|
||||||
|
AppDO appDO = getAppDO();
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO));
|
||||||
|
|
||||||
|
TopicMetrics topicMetrics = getTopicMetrics();
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(topicMetrics);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId() == null &&
|
||||||
|
topicOverview.getAppName() == null &&
|
||||||
|
topicOverview.getByteIn() != null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp5Test() {
|
||||||
|
TopicDO topicDO = getTopicDO();
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO));
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(null);
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).thenReturn(null);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId().equals(topicDO.getAppId()) &&
|
||||||
|
topicOverview.getAppName() == null &&
|
||||||
|
topicOverview.getByteIn() == null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp6Test() {
|
||||||
|
TopicDO topicDO = getTopicDO();
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO));
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(null);
|
||||||
|
TopicMetrics topicMetrics = getTopicMetrics();
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(topicMetrics);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId().equals(topicDO.getAppId()) &&
|
||||||
|
topicOverview.getAppName() == null &&
|
||||||
|
topicOverview.getByteIn() != null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp7Test() {
|
||||||
|
TopicDO topicDO = getTopicDO();
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO));
|
||||||
|
AppDO appDO = getAppDO();
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO));
|
||||||
|
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(null);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId().equals(topicDO.getAppId()) &&
|
||||||
|
topicOverview.getAppName().equals(appDO.getName()) &&
|
||||||
|
topicOverview.getByteIn() == null));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicOverviewListWithTopicList2TopicAndApp8Test() {
|
||||||
|
TopicDO topicDO = getTopicDO();
|
||||||
|
Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO));
|
||||||
|
AppDO appDO = getAppDO();
|
||||||
|
Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO));
|
||||||
|
|
||||||
|
TopicMetrics topicMetrics = getTopicMetrics();
|
||||||
|
Mockito.when(jmxService.getTopicMetrics(
|
||||||
|
Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).
|
||||||
|
thenReturn(topicMetrics);
|
||||||
|
|
||||||
|
List<String> topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC);
|
||||||
|
List<TopicOverview> topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics);
|
||||||
|
Assert.assertFalse(topicOverviewList.isEmpty());
|
||||||
|
Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview ->
|
||||||
|
topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) &&
|
||||||
|
topics.contains(topicOverview.getTopicName()) &&
|
||||||
|
topicOverview.getAppId().equals(topicDO.getAppId()) &&
|
||||||
|
topicOverview.getAppName().equals(appDO.getName()) &&
|
||||||
|
topicOverview.getByteIn() != null));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取指定时间的offset信息")
|
||||||
|
public void getPartitionOffsetListTest() {
|
||||||
|
// 结果为空
|
||||||
|
getPartitionOffsetList2EmptyTest();
|
||||||
|
// 获取成功
|
||||||
|
getPartitionOffsetList2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getPartitionOffsetList2EmptyTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
List<PartitionOffsetDTO> list = topicService.getPartitionOffsetList(clusterDO, INVALID_TOPIC, 0L);
|
||||||
|
Assert.assertTrue(list.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getPartitionOffsetList2SuccessTest() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
List<PartitionOffsetDTO> list = topicService.getPartitionOffsetList(clusterDO, REAL_TOPIC1_IN_ZK, 0L);
|
||||||
|
Assert.assertFalse(list.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test()
|
||||||
|
public void getTopicPartitionStateTest() {
|
||||||
|
// 结果为空
|
||||||
|
getTopicPartitionState2EmptyTest();
|
||||||
|
|
||||||
|
// 获取结果成功
|
||||||
|
getTopicPartitionState2SuccessTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void getTopicPartitionState2EmptyTest() {
|
||||||
|
Map<String, List<PartitionState>> map1 =
|
||||||
|
topicService.getTopicPartitionState(null, REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertTrue(map1.isEmpty());
|
||||||
|
|
||||||
|
Map<String, List<PartitionState>> map2 =
|
||||||
|
topicService.getTopicPartitionState(INVALID_CLUSTER_ID, REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertTrue(map2.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 共有三个topic, REAL_TOPIC1_IN_ZK, REAL_TOPIC2_IN_ZK, ZK_DEFAULT_TOPIC
|
||||||
|
*/
|
||||||
|
private void getTopicPartitionState2SuccessTest() {
|
||||||
|
Map<String, List<PartitionState>> map1 =
|
||||||
|
topicService.getTopicPartitionState(REAL_CLUSTER_ID_IN_MYSQL, REAL_BROKER_ID_IN_ZK);
|
||||||
|
Assert.assertFalse(map1.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试数据采样")
|
||||||
|
public void fetchTopicDataTest() {
|
||||||
|
// invalid partitionId
|
||||||
|
fetchTopicData2InvalidPartitionId();
|
||||||
|
// 指定了offset,截断
|
||||||
|
fetchTopicData2OffsetAndTruncate();
|
||||||
|
// 指定了offset,未截断
|
||||||
|
fetchTopicData2OffsetAndNoTruncate();
|
||||||
|
// 未指定offset, 返回空
|
||||||
|
fetchTopicData2NoOffset2Empty();
|
||||||
|
// 未指定offset,截断
|
||||||
|
fetchTopicData2NoOffsetAndTruncate();
|
||||||
|
// 未指定offset,未截断
|
||||||
|
fetchTopicData2NoOffsetAndNoTruncate();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchTopicData2InvalidPartitionId() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO();
|
||||||
|
topicDataSampleDTO.setPartitionId(INVALID_PARTITION_ID);
|
||||||
|
List<String> result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO);
|
||||||
|
Assert.assertTrue(result.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchTopicData2NoOffsetAndTruncate() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO();
|
||||||
|
topicDataSampleDTO.setOffset(null);
|
||||||
|
List<String> result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO);
|
||||||
|
Assert.assertFalse(result.isEmpty());
|
||||||
|
Assert.assertTrue(result.stream().allMatch(
|
||||||
|
value -> value.length() <= TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchTopicData2NoOffsetAndNoTruncate() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO();
|
||||||
|
topicDataSampleDTO.setOffset(null);
|
||||||
|
topicDataSampleDTO.setTruncate(false);
|
||||||
|
List<String> result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO);
|
||||||
|
Assert.assertFalse(result.isEmpty());
|
||||||
|
Assert.assertTrue(result.stream().allMatch(
|
||||||
|
value -> value.length() > TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchTopicData2OffsetAndTruncate() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO();
|
||||||
|
List<String> result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO);
|
||||||
|
Assert.assertFalse(result.isEmpty());
|
||||||
|
Assert.assertTrue(result.stream().allMatch(
|
||||||
|
value -> value.length() <= TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchTopicData2OffsetAndNoTruncate() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO();
|
||||||
|
topicDataSampleDTO.setTruncate(false);
|
||||||
|
List<String> result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO);
|
||||||
|
Assert.assertFalse(result.isEmpty());
|
||||||
|
Assert.assertTrue(result.stream().allMatch(
|
||||||
|
value -> value.length() > TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void fetchTopicData2NoOffset2Empty() {
|
||||||
|
ClusterDO clusterDO = getClusterDO();
|
||||||
|
TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO();
|
||||||
|
topicDataSampleDTO.setOffset(null);
|
||||||
|
topicDataSampleDTO.setTimeout(-1);
|
||||||
|
List<String> result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO);
|
||||||
|
Assert.assertTrue(result.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试从数据库中获取requestMetrics指标")
|
||||||
|
public void getTopicRequestMetricsFromDBTest() {
|
||||||
|
TopicMetricsDO topicMetricsDO1 = getTopicMetricsDO();
|
||||||
|
topicRequestMetricsDao.add(topicMetricsDO1);
|
||||||
|
|
||||||
|
Date startTime = new Date(0L);
|
||||||
|
Date endTime = new Date();
|
||||||
|
List<TopicMetricsDO> result = topicService.getTopicRequestMetricsFromDB(
|
||||||
|
topicMetricsDO1.getClusterId(), topicMetricsDO1.getTopicName(), startTime, endTime);
|
||||||
|
Assert.assertFalse(result.isEmpty());
|
||||||
|
Assert.assertTrue(result.stream().allMatch(topicMetricsDO ->
|
||||||
|
topicMetricsDO.getClusterId().equals(topicMetricsDO1.getClusterId()) &&
|
||||||
|
topicMetricsDO.getTopicName().equals(topicMetricsDO1.getTopicName()) &&
|
||||||
|
topicMetricsDO.getGmtCreate().after(startTime) &&
|
||||||
|
topicMetricsDO.getGmtCreate().before(endTime)));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试获取topic的broker列表")
|
||||||
|
public void getTopicBrokerListTest() {
|
||||||
|
List<TopicBrokerDTO> topicBrokerList = topicService.getTopicBrokerList(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC2_IN_ZK);
|
||||||
|
Assert.assertFalse(topicBrokerList.isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(description = "测试topic是否有数据写入")
|
||||||
|
public void checkTopicOffsetChangedTest() {
|
||||||
|
// physicalCluster does not exist
|
||||||
|
checkTopicOffsetChanged2ClusterNotExistTest();
|
||||||
|
// endOffsetMap is empty
|
||||||
|
checkTopicOffsetChanged2UnknownTest();
|
||||||
|
// dtoList is not empty and result is Yes
|
||||||
|
checkTopicOffsetChanged2dtoListNotNullAndYesTest();
|
||||||
|
// dtoList is empty and result is No
|
||||||
|
checkTopicOffsetChanged2NoTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkTopicOffsetChanged2ClusterNotExistTest() {
|
||||||
|
Result<TopicOffsetChangedEnum> result =
|
||||||
|
topicService.checkTopicOffsetChanged(INVALID_CLUSTER_ID, REAL_TOPIC1_IN_ZK, 0L);
|
||||||
|
Assert.assertEquals(result.getCode(), ResultStatus.CLUSTER_NOT_EXIST.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkTopicOffsetChanged2UnknownTest() {
|
||||||
|
Result<TopicOffsetChangedEnum> result =
|
||||||
|
topicService.checkTopicOffsetChanged(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC, 0L);
|
||||||
|
Assert.assertEquals(result.getData().getCode(), TopicOffsetChangedEnum.UNKNOWN.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkTopicOffsetChanged2dtoListNotNullAndYesTest() {
|
||||||
|
Result<TopicOffsetChangedEnum> result = topicService.checkTopicOffsetChanged(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
REAL_TOPIC1_IN_ZK,
|
||||||
|
System.currentTimeMillis());
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
Assert.assertEquals(result.getData().getCode(), TopicOffsetChangedEnum.YES.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkTopicOffsetChanged2NoTest() {
|
||||||
|
Result<TopicOffsetChangedEnum> result = topicService.checkTopicOffsetChanged(
|
||||||
|
REAL_CLUSTER_ID_IN_MYSQL,
|
||||||
|
NO_OFFSET_CHANGE_TOPIC_IN_ZK,
|
||||||
|
System.currentTimeMillis());
|
||||||
|
Assert.assertNotNull(result);
|
||||||
|
Assert.assertEquals(result.getData().getCode(), TopicOffsetChangedEnum.NO.getCode());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user