diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AnalysisServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AnalysisServiceTest.java new file mode 100644 index 00000000..b4d3657d --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/AnalysisServiceTest.java @@ -0,0 +1,54 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.entity.ao.analysis.AnalysisBrokerDTO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; + +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * @author xuguang + * @Date 2021/12/23 + */ +public class AnalysisServiceTest extends BaseTest { + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static Integer REAL_BROKER_ID_IN_ZK = 1; + + private final static Long INVALID_CLUSTER_ID = -1L; + + @Autowired + private AnalysisService analysisService; + + @Test + public void doAnalysisBrokerTest() { + // brokerMetrics is null + doAnalysisBroker2brokerMetricsIsNullTest(); + // brokerMetrics is not null + doAnalysisBroker2brokerMetricsIsNotNullTest(); + } + + private void doAnalysisBroker2brokerMetricsIsNullTest() { + AnalysisBrokerDTO analysisBrokerDTO = analysisService.doAnalysisBroker( + INVALID_CLUSTER_ID, + REAL_BROKER_ID_IN_ZK + ); + Assert.assertNotNull(analysisBrokerDTO); + Assert.assertEquals(analysisBrokerDTO.getBrokerId(), REAL_BROKER_ID_IN_ZK); + Assert.assertEquals(analysisBrokerDTO.getClusterId(), INVALID_CLUSTER_ID); + Assert.assertNull(analysisBrokerDTO.getBytesIn()); + } + + private void doAnalysisBroker2brokerMetricsIsNotNullTest() { + AnalysisBrokerDTO analysisBrokerDTO = analysisService.doAnalysisBroker( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK + ); + Assert.assertNotNull(analysisBrokerDTO); + Assert.assertEquals(analysisBrokerDTO.getBrokerId(), REAL_BROKER_ID_IN_ZK); + Assert.assertEquals(analysisBrokerDTO.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL); + Assert.assertNotNull(analysisBrokerDTO.getBytesIn()); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConsumerServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConsumerServiceTest.java new file mode 100644 index 00000000..21162aee --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ConsumerServiceTest.java @@ -0,0 +1,264 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.OffsetLocationEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; +import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumeDetailDTO; +import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroup; +import com.xiaojukeji.kafka.manager.common.entity.ao.consumer.ConsumerGroupSummary; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; + +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; + +/** + * 测试消费组消费情况需要保证集群中存在消费组 + * @author xuguang + * @Date 2021/12/23 + */ +public class ConsumerServiceTest extends BaseTest { + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static Integer REAL_BROKER_ID_IN_ZK = 1; + + private final static Long INVALID_CLUSTER_ID = -1L; + + /** + * 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上 + */ + private final static String REAL_TOPIC1_IN_ZK = "moduleTest"; + + /** + * 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上 + */ + private final static String REAL_TOPIC2_IN_ZK = "xgTest"; + + private final static String INVALID_TOPIC = "xxxxxx"; + + private final static String REAL_CONSUMER_GROUP_NAME = "moduleTestGroup"; + + private final static String INVALID_CONSUMER_GROUP_NAME = "xxxxxxxx"; + + @Autowired + private ConsumerService consumerService; + + private ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(1L); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + private ConsumerGroup getConsumerGroup() { + return new ConsumerGroup( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_CONSUMER_GROUP_NAME, + OffsetLocationEnum.BROKER); + } + + private PartitionOffsetDTO getPartitionOffsetDTO() { + PartitionOffsetDTO partitionOffsetDTO = new PartitionOffsetDTO(); + partitionOffsetDTO.setOffset(0L); + partitionOffsetDTO.setPartitionId(0); + return partitionOffsetDTO; + } + + @Test(description = "测试获取消费组列表") + public void getConsumerGroupListTest() { + List consumerGroupList = consumerService.getConsumerGroupList(REAL_CLUSTER_ID_IN_MYSQL); + Assert.assertFalse(consumerGroupList.isEmpty()); + Assert.assertTrue(consumerGroupList.stream().allMatch(consumerGroup -> + consumerGroup.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL))); + } + + @Test(description = "测试查询消费Topic的消费组") + public void getConsumerGroupListWithTopicTest() { + List consumerGroupList = consumerService.getConsumerGroupList( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC1_IN_ZK + ); + Assert.assertFalse(consumerGroupList.isEmpty()); + Assert.assertTrue(consumerGroupList.stream().allMatch(consumerGroup -> + consumerGroup.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL))); + } + + @Test(description = "测试获取消费Topic的消费组概要信息") + public void getConsumerGroupSummariesTest() { + // result is empty + getConsumerGroupSummaries2EmptyTest(); + // result is not empty + getConsumerGroupSummaries2NotEmptyTest(); + } + + private void getConsumerGroupSummaries2EmptyTest() { + List consumerGroupSummaries = consumerService.getConsumerGroupSummaries( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_TOPIC + ); + Assert.assertTrue(consumerGroupSummaries.isEmpty()); + } + + private void getConsumerGroupSummaries2NotEmptyTest() { + List consumerGroupSummaries = consumerService.getConsumerGroupSummaries( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC1_IN_ZK + ); + Assert.assertFalse(consumerGroupSummaries.isEmpty()); + } + + @Test(description = "测试查询消费详情") + public void getConsumeDetail() { + // result is empty + getConsumeDetail2Empty(); + // result is not empty + getConsumeDetail2NotEmpty(); + } + + private void getConsumeDetail2Empty() { + ClusterDO clusterDO = getClusterDO(); + List consumeDetail1 = + consumerService.getConsumeDetail(clusterDO, INVALID_TOPIC, null); + Assert.assertTrue(consumeDetail1.isEmpty()); + + ConsumerGroup consumerGroup = getConsumerGroup(); + consumerGroup.setOffsetStoreLocation(null); + List consumeDetail2 = + consumerService.getConsumeDetail(clusterDO, REAL_TOPIC1_IN_ZK, consumerGroup); + Assert.assertTrue(consumeDetail2.isEmpty()); + } + + private void getConsumeDetail2NotEmpty() { + ClusterDO clusterDO = getClusterDO(); + ConsumerGroup consumerGroup = getConsumerGroup(); + List consumeDetail1 = + consumerService.getConsumeDetail(clusterDO, REAL_TOPIC1_IN_ZK, consumerGroup); + Assert.assertFalse(consumeDetail1.isEmpty()); + } + + @Test(description = "测试获取消费组消费的Topic列表") + public void getConsumerGroupConsumedTopicListTest() { + // result is empty + getConsumerGroupConsumedTopicList2Empty(); + // result is not empty + getConsumerGroupConsumedTopicList2NotEmpty(); + } + + private void getConsumerGroupConsumedTopicList2Empty() { + List list = consumerService.getConsumerGroupConsumedTopicList( + null, + null, + null); + Assert.assertTrue(list.isEmpty()); + } + + private void getConsumerGroupConsumedTopicList2NotEmpty() { + List list = consumerService.getConsumerGroupConsumedTopicList( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_CONSUMER_GROUP_NAME, + "broker"); + Assert.assertFalse(list.isEmpty()); + } + + @Test(description = "测试获取消费者offset") + public void getConsumerOffsetTest() { + // result is null + getConsumerOffset2NullTest(); + // result is not null + getConsumerOffset2NotNullTest(); + } + + private void getConsumerOffset2NullTest() { + Map consumerOffset1 = consumerService.getConsumerOffset(null, null, null); + Assert.assertNull(consumerOffset1); + + ClusterDO clusterDO = getClusterDO(); + ConsumerGroup consumerGroup = getConsumerGroup(); + consumerGroup.setOffsetStoreLocation(null); + Map consumerOffset2 = consumerService.getConsumerOffset( + clusterDO, + REAL_TOPIC1_IN_ZK, + consumerGroup + ); + Assert.assertNull(consumerOffset2); + } + + private void getConsumerOffset2NotNullTest() { + ClusterDO clusterDO = getClusterDO(); + ConsumerGroup consumerGroup = getConsumerGroup(); + Map consumerOffset = consumerService.getConsumerOffset( + clusterDO, + REAL_TOPIC1_IN_ZK, + consumerGroup + ); + Assert.assertNotNull(consumerOffset); + Assert.assertFalse(consumerOffset.isEmpty()); + } + + @Test(description = "测试获取每个集群消费组的个数") + public void getConsumerGroupNumMapTest() { + ClusterDO clusterDO = getClusterDO(); + Map map = consumerService.getConsumerGroupNumMap(Arrays.asList(clusterDO)); + Assert.assertFalse(map.isEmpty()); + Assert.assertTrue(clusterDO.getId() >= 0); + } + + @Test(description = "验证消费组是否存在") + public void checkConsumerGroupExistTest() { + // 不存在 + checkConsumerGroupExist2FalseTest(); + // 存在 + checkConsumerGroupExist2TrueTest(); + } + + private void checkConsumerGroupExist2FalseTest() { + boolean result = consumerService.checkConsumerGroupExist( + OffsetLocationEnum.BROKER, + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC1_IN_ZK, + INVALID_CONSUMER_GROUP_NAME + ); + Assert.assertFalse(result); + } + + private void checkConsumerGroupExist2TrueTest() { + boolean result = consumerService.checkConsumerGroupExist( + OffsetLocationEnum.BROKER, + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC1_IN_ZK, + REAL_CONSUMER_GROUP_NAME + ); + Assert.assertTrue(result); + } + + @Test(description = "测试重置offset") + public void resetConsumerOffsetTest() { + ClusterDO clusterDO = getClusterDO(); + ConsumerGroup consumerGroup = getConsumerGroup(); + PartitionOffsetDTO partitionOffsetDTO1 = getPartitionOffsetDTO(); + List results = consumerService.resetConsumerOffset( + clusterDO, + REAL_TOPIC1_IN_ZK, + consumerGroup, + Arrays.asList(partitionOffsetDTO1) + ); + Assert.assertFalse(results.isEmpty()); + Assert.assertTrue(results.stream().allMatch(result -> + result.getCode() == ResultStatus.SUCCESS.getCode())); + } + +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/JmxServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/JmxServiceTest.java new file mode 100644 index 00000000..c813c275 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/JmxServiceTest.java @@ -0,0 +1,386 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.KafkaClientEnum; +import com.xiaojukeji.kafka.manager.common.constant.KafkaMetricsCollections; +import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BrokerMetrics; +import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import org.apache.kafka.common.TopicPartition; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * @author xuguang + * @Date 2021/12/14 + */ +public class JmxServiceTest extends BaseTest { + /** + * 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上 + */ + private final static String REAL_TOPIC1_IN_ZK = "moduleTest"; + + /** + * 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上 + */ + private final static String REAL_TOPIC2_IN_ZK = "xgTest"; + + private final static String INVALID_TOPIC = "xxxxx"; + + private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets"; + + private final static String NO_OFFSET_CHANGE_TOPIC_IN_ZK = "NoOffsetChangeTopic"; + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static Integer REAL_BROKER_ID_IN_ZK = 1; + + private final static Integer INVALID_BROKER_ID = -1; + + private final static Long INVALID_CLUSTER_ID = -1L; + + private final static Integer INVALID_PARTITION_ID = -1; + + private final static String CLIENT_ID = "dkm_admin.moduleTest"; + + private final static Integer INVALID_METRICS_CODE = -1; + + @Autowired + private JmxService jmxService; + + private PartitionState getPartitionState() { + PartitionState partitionState = new PartitionState(); + partitionState.setPartitionId(0); + partitionState.setLeader(2); + return partitionState; + } + + @Test + public void getBrokerMetricsTest() { + // 结果为空 + getBrokerMetrics2NullTest(); + // mbeanV2ListEmpty + getBrokerMetrics2mbeanV2ListEmptyTest(); + // 获取成功 + getBrokerMetrics2SuccessTest(); + } + + private void getBrokerMetrics2NullTest() { + BrokerMetrics brokerMetrics1 = jmxService.getBrokerMetrics(null, null, null); + Assert.assertNull(brokerMetrics1); + + BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_BROKER_ID, + KafkaMetricsCollections.BROKER_ANALYSIS_METRICS); + Assert.assertNull(brokerMetrics2); + } + + private void getBrokerMetrics2mbeanV2ListEmptyTest() { + BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK, + -1); + Assert.assertNotNull(brokerMetrics2); + Assert.assertEquals(brokerMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL); + Assert.assertEquals(brokerMetrics2.getBrokerId(), REAL_BROKER_ID_IN_ZK); + Assert.assertTrue(brokerMetrics2.getMetricsMap().isEmpty()); + } + + private void getBrokerMetrics2SuccessTest() { + BrokerMetrics brokerMetrics2 = jmxService.getBrokerMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK, + KafkaMetricsCollections.BROKER_ANALYSIS_METRICS); + Assert.assertNotNull(brokerMetrics2); + Assert.assertEquals(brokerMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL); + Assert.assertEquals(brokerMetrics2.getBrokerId(), REAL_BROKER_ID_IN_ZK); + Assert.assertFalse(brokerMetrics2.getMetricsMap().isEmpty()); + } + + @Test + public void getTopicMetricsWithBrokerIdTest() { + // 结果为空 + getTopicMetricsWithBrokerId2nullTest(); + // 获取的metrics为空 + getTopicMetricsWithBrokerId2MetricsIsNullTest(); + // 获取指标成功 + getTopicMetricsWithBrokerId2SuccessTest(); + } + + private void getTopicMetricsWithBrokerId2nullTest() { + TopicMetrics topicMetrics1 = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK, + REAL_TOPIC1_IN_ZK, + -1, true); + Assert.assertNull(topicMetrics1); + + TopicMetrics topicMetrics2 = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_BROKER_ID, + REAL_TOPIC1_IN_ZK, + KafkaMetricsCollections.BROKER_ANALYSIS_METRICS + , true); + Assert.assertNull(topicMetrics2); + } + + private void getTopicMetricsWithBrokerId2MetricsIsNullTest() { + // brokerId为3,不在该topic下 + TopicMetrics topicMetrics2 = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + 3, + REAL_TOPIC1_IN_ZK, + KafkaMetricsCollections.BROKER_ANALYSIS_METRICS + , true); + Assert.assertNotNull(topicMetrics2); + Assert.assertEquals(topicMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL); + Assert.assertEquals(topicMetrics2.getTopicName(), REAL_TOPIC1_IN_ZK); + Assert.assertTrue(topicMetrics2.getMetricsMap().isEmpty()); + } + + private void getTopicMetricsWithBrokerId2SuccessTest() { + TopicMetrics topicMetrics2 = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK, + REAL_TOPIC1_IN_ZK, + KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB + , true); + Assert.assertNotNull(topicMetrics2); + Assert.assertEquals(topicMetrics2.getClusterId(), REAL_CLUSTER_ID_IN_MYSQL); + Assert.assertEquals(topicMetrics2.getTopicName(), REAL_TOPIC1_IN_ZK); + Assert.assertFalse(topicMetrics2.getMetricsMap().isEmpty()); + } + + @Test + public void getTopicMetricsWithoutBrokerId() { + // 返回为空 + getTopicMetricsWithoutBrokerId2Null(); + // add + getTopicMetricsWithoutBrokerId2Add(); + // max + getTopicMetricsWithoutBrokerId2Max(); + } + + private void getTopicMetricsWithoutBrokerId2Null() { + TopicMetrics topicMetrics = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_TOPIC, + KafkaMetricsCollections.TOPIC_METRICS_TO_DB + , true); + Assert.assertNull(topicMetrics); + } + + private void getTopicMetricsWithoutBrokerId2Add() { + TopicMetrics topicMetrics = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC1_IN_ZK, + KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB + , true); + Assert.assertNotNull(topicMetrics); + Assert.assertNotNull(topicMetrics.getBrokerMetricsList()); + Assert.assertNotNull(topicMetrics.getMetricsMap()); + } + + private void getTopicMetricsWithoutBrokerId2Max() { + TopicMetrics topicMetrics = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC2_IN_ZK, + KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB + , false); + Assert.assertNotNull(topicMetrics); + Assert.assertNotNull(topicMetrics.getBrokerMetricsList()); + Assert.assertNotNull(topicMetrics.getMetricsMap()); + } + + @Test(description = "测试获取集群下所有topic指标") + public void getTopicMetricsList() { + List topicMetrics = jmxService.getTopicMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + KafkaMetricsCollections.TOPIC_REQUEST_TIME_METRICS_TO_DB + , false); + Assert.assertFalse(topicMetrics.isEmpty()); + Assert.assertTrue(topicMetrics.stream().allMatch(topicMetric -> + topicMetric.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL))); + } + + @Test(description = "测试获取broker版本") + public void getBrokerVersion() { + // 结果为空 + getBrokerVersion2Empty(); + // 结果不为空 + getBrokerVersion2NotEmpty(); + } + + private void getBrokerVersion2Empty() { + String brokerVersion = jmxService.getBrokerVersion( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_BROKER_ID); + Assert.assertEquals(brokerVersion, ""); + } + + private void getBrokerVersion2NotEmpty() { + String brokerVersion = jmxService.getBrokerVersion( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK); + Assert.assertNotEquals(brokerVersion, ""); + } + + @Test(description = "获取客户端限流信息") + public void getTopicAppThrottleTest() { + // 结果为0 + getTopicAppThrottle2ZeroTest(); + // 结果不为0 + getTopicAppThrottle2NotZeroTest(); + } + + private void getTopicAppThrottle2ZeroTest() { + double topicAppThrottle = jmxService.getTopicAppThrottle( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_BROKER_ID, + "1", + KafkaClientEnum.FETCH_CLIENT); + Assert.assertEquals(topicAppThrottle, 0.0d); + } + + private void getTopicAppThrottle2NotZeroTest() { + double topicAppThrottle = jmxService.getTopicAppThrottle( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK, + CLIENT_ID, + KafkaClientEnum.FETCH_CLIENT); + // 未设置限流,所以还是为0 + Assert.assertEquals(topicAppThrottle, 0.0d); + } + + @Test(description = "获取被限流信息") + public void getBrokerThrottleClientsTest() { + // 结果为空 + getBrokerThrottleClients2EmptyTest(); + // 构造限流client,返回结果不为空 + getBrokerThrottleClients2NotEmptyTest(); + } + + private void getBrokerThrottleClients2EmptyTest() { + Set brokerThrottleClients = jmxService.getBrokerThrottleClients( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_BROKER_ID, + KafkaClientEnum.FETCH_CLIENT); + Assert.assertTrue(brokerThrottleClients.isEmpty()); + } + + private void getBrokerThrottleClients2NotEmptyTest() { + Set brokerThrottleClients = jmxService.getBrokerThrottleClients( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_BROKER_ID_IN_ZK, + KafkaClientEnum.FETCH_CLIENT); + Assert.assertFalse(brokerThrottleClients.isEmpty()); + } + + @Test(description = "测试获取topic消息压缩指标") + public void getTopicCodeCValueTest() { + // 结果为null + getTopicCodeCValue2NullTest(); + // 结果不为null + getTopicCodeCValue2SuccessTest(); + } + + private void getTopicCodeCValue2NullTest() { + String result = jmxService.getTopicCodeCValue(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC); + Assert.assertNull(result); + } + + private void getTopicCodeCValue2SuccessTest() { + String result = jmxService.getTopicCodeCValue( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC2_IN_ZK); + Assert.assertNotNull(result); + } + + @Test(description = "测试从JMX中获取appId维度的的流量信息") + public void getTopicAppMetricsTest() { + // result is empty + getTopicAppMetrics2Empty(); + // result is not empty + getTopicAppMetrics2NotEmpty(); + } + + private void getTopicAppMetrics2Empty() { + List topicAppMetrics = jmxService.getTopicAppMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_METRICS_CODE); + Assert.assertTrue(topicAppMetrics.isEmpty()); + + List topicAppMetrics2 = jmxService.getTopicAppMetrics( + INVALID_CLUSTER_ID, + KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB); + Assert.assertTrue(topicAppMetrics2.isEmpty()); + } + + private void getTopicAppMetrics2NotEmpty() { + List topicAppMetrics = jmxService.getTopicAppMetrics( + REAL_CLUSTER_ID_IN_MYSQL, + KafkaMetricsCollections.APP_TOPIC_METRICS_TO_DB + ); + Assert.assertFalse(topicAppMetrics.isEmpty()); + } + + @Test + public void getBrokerTopicLocationTest() { + // result is empty + getBrokerTopicLocation2EmptyTest(); + // result is not empty + getBrokerTopicLocation2NotEmptyTest(); + } + + private void getBrokerTopicLocation2EmptyTest() { + Map brokerTopicLocation = jmxService.getBrokerTopicLocation( + REAL_CLUSTER_ID_IN_MYSQL, + INVALID_BROKER_ID + ); + Assert.assertTrue(brokerTopicLocation.isEmpty()); + } + + private void getBrokerTopicLocation2NotEmptyTest() { + Map brokerTopicLocation = jmxService.getBrokerTopicLocation( + REAL_CLUSTER_ID_IN_MYSQL, + 2 + ); + Assert.assertFalse(brokerTopicLocation.isEmpty()); + } + + @Test + public void getPartitionAttributeTest() { + // result is empty + getPartitionAttribute2EmptyTest(); + // result is not empty + getPartitionAttribute2NotEmptyTest(); + } + + private void getPartitionAttribute2EmptyTest() { + Map list = jmxService.getPartitionAttribute( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC2_IN_ZK, + Collections.emptyList()); + Assert.assertTrue(list.isEmpty()); + } + + private void getPartitionAttribute2NotEmptyTest() { + // 需要确定leader所在broker + PartitionState partitionState1 = getPartitionState(); + PartitionState partitionState2 = getPartitionState(); + partitionState2.setLeader(3); + partitionState2.setPartitionId(1); + + Map list = jmxService.getPartitionAttribute( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC2_IN_ZK, + Arrays.asList(partitionState1, partitionState1, partitionState2) + ); + Assert.assertFalse(list.isEmpty()); + } +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/LogicalClusterServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/LogicalClusterServiceTest.java index 92d62c27..73a8ae3f 100644 --- a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/LogicalClusterServiceTest.java +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/LogicalClusterServiceTest.java @@ -31,6 +31,10 @@ import java.util.*; */ public class LogicalClusterServiceTest extends BaseTest { + private final static Long INVALID_CLUSTER_ID = -1L; + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + @Autowired @InjectMocks private LogicalClusterService logicalClusterService; @@ -52,8 +56,8 @@ public class LogicalClusterServiceTest extends BaseTest { @DataProvider(name = "provideLogicalClusterDO") public Object[][] provideLogicalClusterDO() { LogicalClusterDO logicalClusterDO = new LogicalClusterDO(); - logicalClusterDO.setId(100L); - logicalClusterDO.setClusterId(1L); + logicalClusterDO.setId(INVALID_CLUSTER_ID); + logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); logicalClusterDO.setIdentification("moduleTestLogicalCluster"); logicalClusterDO.setName("moduleTestLogicalCluster"); logicalClusterDO.setMode(1); @@ -66,8 +70,8 @@ public class LogicalClusterServiceTest extends BaseTest { private LogicalClusterDO getLogicalClusterDO() { LogicalClusterDO logicalClusterDO = new LogicalClusterDO(); - logicalClusterDO.setId(100L); - logicalClusterDO.setClusterId(1L); + logicalClusterDO.setId(INVALID_CLUSTER_ID); + logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); logicalClusterDO.setIdentification("moduleTestLogicalCluster"); logicalClusterDO.setName("moduleTestLogicalCluster"); logicalClusterDO.setMode(0); @@ -120,7 +124,7 @@ public class LogicalClusterServiceTest extends BaseTest { Assert.assertEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode()); // regionList为空情况 - logicalClusterDO.setClusterId(1L); + logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); logicalClusterDO.setRegionList(""); ResultStatus result2 = logicalClusterService.createLogicalCluster(logicalClusterDO); Assert.assertEquals(result2.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode()); @@ -135,7 +139,7 @@ public class LogicalClusterServiceTest extends BaseTest { LogicalClusterDO logicalClusterDO = getLogicalClusterDO(); Mockito.when(logicalClusterDao.insert(Mockito.any())).thenReturn(1); // 不存在该物理集群情况 - logicalClusterDO.setClusterId(100L); + logicalClusterDO.setClusterId(INVALID_CLUSTER_ID); ResultStatus result1 = logicalClusterService.createLogicalCluster(logicalClusterDO); Assert.assertNotEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode()); Assert.assertEquals(result1.getCode(), ResultStatus.SUCCESS.getCode()); @@ -205,7 +209,7 @@ public class LogicalClusterServiceTest extends BaseTest { private void deleteById2ResourceNotExistTest() { Mockito.when(logicalClusterDao.deleteById(Mockito.anyLong())).thenReturn(-1); - ResultStatus resultStatus = logicalClusterService.deleteById(100L); + ResultStatus resultStatus = logicalClusterService.deleteById(INVALID_CLUSTER_ID); Assert.assertEquals(resultStatus.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode()); } @@ -235,7 +239,7 @@ public class LogicalClusterServiceTest extends BaseTest { @Test(dataProvider = "provideLogicalClusterDO", description = "修改集群时无对应逻辑集群") public void updateById2ResourceNotExistTest(LogicalClusterDO logicalClusterDO) { - logicalClusterDO.setId(100L); + logicalClusterDO.setId(INVALID_CLUSTER_ID); ResultStatus resultStatus2 = logicalClusterService.updateById(logicalClusterDO); Assert.assertEquals(resultStatus2.getCode(), ResultStatus.RESOURCE_NOT_EXIST.getCode()); } @@ -250,7 +254,7 @@ public class LogicalClusterServiceTest extends BaseTest { Assert.assertEquals(result1.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode()); // regionList为空情况 - logicalClusterDO.setClusterId(1L); + logicalClusterDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); logicalClusterDO.setRegionList(""); ResultStatus result2 = logicalClusterService.updateById(logicalClusterDO); Assert.assertEquals(result2.getCode(), ResultStatus.RESOURCE_ALREADY_USED.getCode()); @@ -315,7 +319,7 @@ public class LogicalClusterServiceTest extends BaseTest { } private void getLogicalCluster2NullTest() { - LogicalCluster logicalCluster = logicalClusterService.getLogicalCluster(100L); + LogicalCluster logicalCluster = logicalClusterService.getLogicalCluster(INVALID_CLUSTER_ID); Assert.assertNull(logicalCluster); } diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ReassignServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ReassignServiceTest.java new file mode 100644 index 00000000..a517c05d --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/ReassignServiceTest.java @@ -0,0 +1,458 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.xiaojukeji.kafka.manager.common.bizenum.TaskStatusReassignEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicReassignActionEnum; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.reassign.ReassignStatus; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.reassign.ReassignExecDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.reassign.ReassignExecSubDTO; +import com.xiaojukeji.kafka.manager.common.entity.dto.op.reassign.ReassignTopicDTO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ReassignTaskDO; +import com.xiaojukeji.kafka.manager.dao.ReassignTaskDao; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import kafka.common.TopicAndPartition; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * @author xuguang + * @Date 2021/12/14 + */ +public class ReassignServiceTest extends BaseTest { + + /** + * 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上 + */ + private final static String REAL_TOPIC2_IN_ZK = "xgTest"; + + private final static String ADMIN_OPERATOR = "admin"; + + @Autowired + @InjectMocks + private ReassignService reassignService; + + @Mock + private RegionService regionService; + + @Mock + private ClusterService clusterService; + + @Mock + private ReassignTaskDao reassignTaskDao; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + private final static String ZOOKEEPER_ADDRESS = "10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"; + + private final static String REASSIGNMENTJSON = + "{ \"version\": 1, \"partitions\": [ { \"topic\": \"reassignTest\", \"partition\": 1, \"replicas\": [ 1,2,3 ], \"log_dirs\": [ \"any\",\"any\",\"any\" ] }, { \"topic\": \"reassignTest\", \"partition\": 0, \"replicas\": [ 1,2,3 ], \"log_dirs\": [ \"any\",\"any\",\"any\" ] } ] }"; + + private ReassignTopicDTO getReassignTopicDTO() { + ReassignTopicDTO reassignTopicDTO = new ReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + reassignTopicDTO.setBrokerIdList(Arrays.asList(2,3)); + reassignTopicDTO.setRegionId(2L); + reassignTopicDTO.setPartitionIdList(Arrays.asList(0, 1)); + reassignTopicDTO.setThrottle(100000L); + reassignTopicDTO.setMaxThrottle(100000L); + reassignTopicDTO.setMinThrottle(100000L); + reassignTopicDTO.setOriginalRetentionTime(10000L); + reassignTopicDTO.setReassignRetentionTime(10000L); + reassignTopicDTO.setBeginTime(100000L); + reassignTopicDTO.setDescription(""); + return reassignTopicDTO; + } + + private ReassignExecDTO getReassignExecDTO() { + ReassignExecDTO reassignExecDTO = new ReassignExecDTO(); + reassignExecDTO.setTaskId(1L); + reassignExecDTO.setAction("modify"); + reassignExecDTO.setBeginTime(0L); + return reassignExecDTO; + } + + private ReassignTaskDO getReassignTaskDO() { + ReassignTaskDO reassignTaskDO = new ReassignTaskDO(); + reassignTaskDO.setId(1L); + reassignTaskDO.setClusterId(1L); + reassignTaskDO.setStatus(0); + reassignTaskDO.setTaskId(1L); + reassignTaskDO.setTopicName(REAL_TOPIC2_IN_ZK); + reassignTaskDO.setPartitions("0,1,2"); + reassignTaskDO.setReassignmentJson(""); + reassignTaskDO.setRealThrottle(1000L); + reassignTaskDO.setMaxThrottle(1000L); + reassignTaskDO.setMinThrottle(1000L); + reassignTaskDO.setBeginTime(new Date()); + reassignTaskDO.setSrcBrokers("0"); + reassignTaskDO.setDestBrokers("1"); + reassignTaskDO.setReassignRetentionTime(1000L); + reassignTaskDO.setOriginalRetentionTime(1000L); + reassignTaskDO.setDescription("测试迁移任务"); + reassignTaskDO.setOperator(ADMIN_OPERATOR); + return reassignTaskDO; + } + + private ReassignExecSubDTO getReassignExecSubDTO() { + ReassignExecSubDTO reassignExecSubDTO = new ReassignExecSubDTO(); + reassignExecSubDTO.setSubTaskId(1L); + reassignExecSubDTO.setAction("modify"); + reassignExecSubDTO.setThrottle(100000L); + reassignExecSubDTO.setMaxThrottle(100000L); + reassignExecSubDTO.setMinThrottle(100000L); + return reassignExecSubDTO; + } + + private ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(1L); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + @Test(description = "创建迁移任务") + public void createTaskTest() { + // 参数错误 + createTask2paramIllegalTest(); + // 物理集群不存在 + createTask2ClusterNotExistTest(); + // topic不存在 + createTask2TopicNotExistTest(); + // broker数量不足 + createTask2BrokerNumNotEnoughTest(); + // broker不存在 + createTask2BrokerNotExistTest(); + // broker数量不足, checkParamLegal()方法中 + createTask2BrokerNumNotEnough2Test(); + // 参数错误, checkParamLegal()方法中 + createTask2ParamIllegal2Test(); + // 分区为空 + createTask2PartitionIdListEmptyTest(); + // 分区不存在 + createTask2PartitionNotExistTest(); + // 创建任务成功 + createTask2SuccessTest(); + } + + private void createTask2paramIllegalTest() { + ResultStatus result = reassignService.createTask(Collections.emptyList(), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void createTask2ClusterNotExistTest() { + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(-1L); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.CLUSTER_NOT_EXIST.getCode()); + } + + private void createTask2TopicNotExistTest() { + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName("xxx"); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.TOPIC_NOT_EXIST.getCode()); + } + + private void createTask2BrokerNumNotEnoughTest() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(null); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NUM_NOT_ENOUGH.getCode()); + } + + private void createTask2BrokerNotExistTest() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(100, 2, 3)); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NOT_EXIST.getCode()); + } + + private void createTask2BrokerNumNotEnough2Test() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(2, 3)); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.BROKER_NUM_NOT_ENOUGH.getCode()); + } + + private void createTask2ParamIllegal2Test() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3)); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void createTask2PartitionIdListEmptyTest() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3)); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + reassignTopicDTO.setOriginalRetentionTime(168 * 3600000L); + reassignTopicDTO.setPartitionIdList(Collections.emptyList()); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.PARAM_ILLEGAL.getCode()); + } + + private void createTask2PartitionNotExistTest() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3)); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setClusterId(1L); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + reassignTopicDTO.setOriginalRetentionTime(168 * 3600000L); + reassignTopicDTO.setPartitionIdList(Arrays.asList(100, 0)); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.PARTITION_NOT_EXIST.getCode()); + } + + private void createTask2SuccessTest() { + Mockito.when(regionService.getFullBrokerIdList( + Mockito.anyLong(), Mockito.anyLong(), Mockito.anyList())).thenReturn(Arrays.asList(1, 2, 3)); + + ReassignTopicDTO reassignTopicDTO = getReassignTopicDTO(); + reassignTopicDTO.setTopicName(REAL_TOPIC2_IN_ZK); + reassignTopicDTO.setOriginalRetentionTime(168 * 3600000L); + ResultStatus result = reassignService.createTask(Arrays.asList(reassignTopicDTO), ADMIN_OPERATOR); + Assert.assertEquals(result.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test(description = "测试获取迁移任务") + public void getTaskTest() { + // 测试获取成功 + getTaskTest2Success(); + // 测试获取失败 + getTaskTest2Exception(); + } + + private void getTaskTest2Success() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + + List task = reassignService.getTask(reassignTask.getTaskId()); + Assert.assertFalse(task.isEmpty()); + Assert.assertTrue(task.stream().allMatch(reassignTaskDO -> + reassignTaskDO.getTaskId().equals(reassignTask.getTaskId()) && + reassignTaskDO.getClusterId().equals(reassignTask.getClusterId()) && + reassignTaskDO.getStatus().equals(reassignTask.getStatus()))); + } + + private void getTaskTest2Exception() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenThrow(RuntimeException.class); + + List task = reassignService.getTask(reassignTask.getTaskId()); + Assert.assertNull(task); + } + + @Test(description = "修改迁移任务") + public void modifyTask() { + // operation forbidden + modifyTask2OperationForbiddenTest(); + // 修改成功 + modifyTask2Success(); + // mysqlError + modifyTask2MysqlError(); + // 任务不存在 + modifyTask2TaskNotExistTest(); + } + + private void modifyTask2TaskNotExistTest() { + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenThrow(RuntimeException.class); + + ReassignExecDTO reassignExecDTO = getReassignExecDTO(); + reassignExecDTO.setTaskId(100L); + ResultStatus resultStatus = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.TASK_NOT_EXIST.getCode()); + } + + private void modifyTask2OperationForbiddenTest() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + reassignTask.setStatus(1); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + + ReassignExecDTO reassignExecDTO = getReassignExecDTO(); + ResultStatus resultStatus1 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START); + Assert.assertEquals(resultStatus1.getCode(), ResultStatus.OPERATION_FORBIDDEN.getCode()); + } + + private void modifyTask2Success() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + reassignTask.setStatus(0); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + + ReassignExecDTO reassignExecDTO = getReassignExecDTO(); + // cancel action + ResultStatus resultStatus1 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.CANCEL); + Assert.assertEquals(resultStatus1.getCode(), ResultStatus.SUCCESS.getCode()); + + // start action + reassignTask.setStatus(0); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + ResultStatus resultStatus2 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.SUCCESS.getCode()); + + // modify action + reassignTask.setStatus(0); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + ResultStatus resultStatus3 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.MODIFY); + Assert.assertEquals(resultStatus3.getCode(), ResultStatus.SUCCESS.getCode()); + } + + private void modifyTask2MysqlError() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + reassignTask.setStatus(0); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + + ReassignExecDTO reassignExecDTO = getReassignExecDTO(); + // cancel action + Mockito.doThrow(RuntimeException.class).when(reassignTaskDao).batchUpdate(Mockito.anyList()); + ResultStatus resultStatus1 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.CANCEL); + Assert.assertEquals(resultStatus1.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + + // start action + reassignTask.setStatus(0); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + Mockito.doThrow(RuntimeException.class).when(reassignTaskDao).batchUpdate(Mockito.anyList()); + ResultStatus resultStatus2 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.START); + Assert.assertEquals(resultStatus2.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + + // modify action + reassignTask.setStatus(0); + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTask)); + Mockito.doThrow(RuntimeException.class).when(reassignTaskDao).batchUpdate(Mockito.anyList()); + ResultStatus resultStatus3 = reassignService.modifyTask(reassignExecDTO, TopicReassignActionEnum.MODIFY); + Assert.assertEquals(resultStatus3.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + } + + @Test(description = "修改子任务测试") + public void modifySubTaskTest() { + // 任务不存在 + modifySubTask2TaskNotExist(); + // 修改任务成功 + modifySubTask2Success(); + // 修改任务失败 + modifySubTask2MysqlError(); + } + + private void modifySubTask2TaskNotExist() { + Mockito.when(reassignTaskDao.getSubTask(Mockito.anyLong())).thenReturn(null); + ResultStatus resultStatus = reassignService.modifySubTask(new ReassignExecSubDTO()); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.TASK_NOT_EXIST.getCode()); + } + + private void modifySubTask2Success() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + Mockito.when(reassignTaskDao.getSubTask(Mockito.anyLong())).thenReturn(reassignTask); + ReassignExecSubDTO reassignExecSubDTO = getReassignExecSubDTO(); + ResultStatus resultStatus = reassignService.modifySubTask(reassignExecSubDTO); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + private void modifySubTask2MysqlError() { + ReassignTaskDO reassignTask = getReassignTaskDO(); + Mockito.when(reassignTaskDao.getSubTask(Mockito.anyLong())).thenReturn(reassignTask); + Mockito.when(reassignTaskDao.updateById(Mockito.any())).thenThrow(RuntimeException.class); + ReassignExecSubDTO reassignExecSubDTO = getReassignExecSubDTO(); + ResultStatus resultStatus = reassignService.modifySubTask(reassignExecSubDTO); + Assert.assertEquals(resultStatus.getCode(), ResultStatus.MYSQL_ERROR.getCode()); + } + + @Test() + public void getReassignTaskListTest() { + // 获取成功 + getReassignTaskList2Success(); + // 获取失败 + getReassignTaskList2Empty(); + } + + private void getReassignTaskList2Success() { + Mockito.when(reassignTaskDao.listAll()).thenReturn(Arrays.asList(new ReassignTaskDO())); + List reassignTaskList = reassignService.getReassignTaskList(); + Assert.assertFalse(reassignTaskList.isEmpty()); + } + + private void getReassignTaskList2Empty() { + Mockito.when(reassignTaskDao.listAll()).thenThrow(RuntimeException.class); + List reassignTaskList = reassignService.getReassignTaskList(); + Assert.assertTrue(reassignTaskList.isEmpty()); + } + + @Test + public void getReassignStatusTest() { + // 获取成功 + getReassignStatus2Success(); + // task不存在 + getReassignStatus2TaskNotExistTest(); + } + + private void getReassignStatus2TaskNotExistTest() { + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenThrow(RuntimeException.class); + Result> reassignStatus = reassignService.getReassignStatus(1L); + Assert.assertEquals(reassignStatus.getCode(), ResultStatus.TASK_NOT_EXIST.getCode()); + } + + private void getReassignStatus2Success() { + ClusterDO clusterDO1 = getClusterDO(); + ClusterDO clusterDO2 = getClusterDO(); + clusterDO2.setId(100L); + Map map = new HashMap<>(); + map.put(clusterDO1.getId(), clusterDO1); + map.put(clusterDO2.getId(), clusterDO2); + Mockito.when(clusterService.listMap()).thenReturn(map); + + ReassignTaskDO reassignTaskDO1 = getReassignTaskDO(); + ReassignTaskDO reassignTaskDO2 = getReassignTaskDO(); + reassignTaskDO2.setStatus(TaskStatusReassignEnum.RUNNING.getCode()); + + Mockito.when(reassignTaskDao.getByTaskId(Mockito.anyLong())).thenReturn(Arrays.asList(reassignTaskDO1, reassignTaskDO2)); + Result> reassignStatus = reassignService.getReassignStatus(1L); + Assert.assertFalse(reassignStatus.getData().isEmpty()); + Assert.assertEquals(reassignStatus.getCode(), ResultStatus.SUCCESS.getCode()); + } + + @Test + public void verifyAssignmenTest() { + Map map = reassignService.verifyAssignment(ZOOKEEPER_ADDRESS, REASSIGNMENTJSON); + Assert.assertFalse(map.isEmpty()); + } + +} diff --git a/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicServiceTest.java b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicServiceTest.java new file mode 100644 index 00000000..3570aee6 --- /dev/null +++ b/kafka-manager-core/src/test/java/com/xiaojukeji/kafka/manager/service/service/TopicServiceTest.java @@ -0,0 +1,741 @@ +package com.xiaojukeji.kafka.manager.service.service; + +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; +import com.xiaojukeji.kafka.manager.common.bizenum.OffsetPosEnum; +import com.xiaojukeji.kafka.manager.common.bizenum.TopicOffsetChangedEnum; +import com.xiaojukeji.kafka.manager.common.constant.TopicSampleConstant; +import com.xiaojukeji.kafka.manager.common.entity.Result; +import com.xiaojukeji.kafka.manager.common.entity.ResultStatus; +import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionAttributeDTO; +import com.xiaojukeji.kafka.manager.common.entity.ao.PartitionOffsetDTO; +import com.xiaojukeji.kafka.manager.common.entity.ao.topic.*; +import com.xiaojukeji.kafka.manager.common.entity.dto.normal.TopicDataSampleDTO; +import com.xiaojukeji.kafka.manager.common.entity.metrics.BaseMetrics; +import com.xiaojukeji.kafka.manager.common.entity.metrics.TopicMetrics; +import com.xiaojukeji.kafka.manager.common.entity.pojo.ClusterDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.TopicMetricsDO; +import com.xiaojukeji.kafka.manager.common.entity.pojo.gateway.AppDO; +import com.xiaojukeji.kafka.manager.common.zookeeper.znode.brokers.PartitionState; +import com.xiaojukeji.kafka.manager.dao.TopicRequestMetricsDao; +import com.xiaojukeji.kafka.manager.service.config.BaseTest; +import com.xiaojukeji.kafka.manager.service.service.gateway.AppService; +import org.apache.kafka.common.TopicPartition; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.*; + +/** + * @author xuguang + * @Date 2021/12/20 + */ +public class TopicServiceTest extends BaseTest { + + /** + * 集群共包括三个broker:1,2,3, 该topic 1分区 1副本因子,在broker1上 + */ + private final static String REAL_TOPIC1_IN_ZK = "moduleTest"; + + /** + * 集群共包括三个broker:1,2,3, 该topic 2分区 3副本因子,在broker1,2,3上 + */ + private final static String REAL_TOPIC2_IN_ZK = "xgTest"; + + private final static String INVALID_TOPIC = "xxxxx"; + + private final static String ZK_DEFAULT_TOPIC = "_consumer_offsets"; + + private final static String NO_OFFSET_CHANGE_TOPIC_IN_ZK = "NoOffsetChangeTopic"; + + private final static Long REAL_CLUSTER_ID_IN_MYSQL = 1L; + + private final static Integer REAL_BROKER_ID_IN_ZK = 3; + + private final static Long INVALID_CLUSTER_ID = -1L; + + private final static Integer INVALID_PARTITION_ID = -1; + + @Autowired + @InjectMocks + private TopicService topicService; + + @Mock + private TopicManagerService topicManagerService; + + @Mock + private AppService appService; + + @Mock + private JmxService jmxService; + + @Autowired + private TopicRequestMetricsDao topicRequestMetricsDao; + + @BeforeMethod + public void setup() { + MockitoAnnotations.initMocks(this); + } + + private TopicMetricsDO getTopicMetricsDO() { + TopicMetricsDO topicMetricsDO = new TopicMetricsDO(); + topicMetricsDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicMetricsDO.setAppId("moduleTestAppId"); + topicMetricsDO.setTopicName(REAL_TOPIC1_IN_ZK); + topicMetricsDO.setMetrics(""); + topicMetricsDO.setGmtCreate(new Date()); + return topicMetricsDO; + } + + private TopicDO getTopicDO() { + TopicDO topicDO = new TopicDO(); + topicDO.setClusterId(REAL_CLUSTER_ID_IN_MYSQL); + topicDO.setTopicName(REAL_TOPIC1_IN_ZK); + topicDO.setAppId("moduleTestAppId"); + topicDO.setDescription(INVALID_TOPIC); + topicDO.setPeakBytesIn(100000L); + return topicDO; + } + + private AppDO getAppDO() { + AppDO appDO = new AppDO(); + appDO.setId(4L); + appDO.setAppId("moduleTestAppId"); + appDO.setName("moduleTestApp"); + appDO.setPassword("moduleTestApp"); + appDO.setType(1); + appDO.setApplicant("admin"); + appDO.setPrincipals("admin"); + appDO.setDescription("moduleTestApp"); + appDO.setCreateTime(new Date(1638786493173L)); + appDO.setModifyTime(new Date(1638786493173L)); + return appDO; + } + + public ClusterDO getClusterDO() { + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + clusterDO.setClusterName("LogiKM_moduleTest"); + clusterDO.setZookeeper("10.190.46.198:2181,10.190.14.237:2181,10.190.50.65:2181/xg"); + clusterDO.setBootstrapServers("10.190.46.198:9093,10.190.14.237:9093,10.190.50.65:9093"); + clusterDO.setSecurityProperties("{ \t\"security.protocol\": \"SASL_PLAINTEXT\", \t\"sasl.mechanism\": \"PLAIN\", \t\"sasl.jaas.config\": \"org.apache.kafka.common.security.plain.PlainLoginModule required username=\\\"dkm_admin\\\" password=\\\"km_kMl4N8as1Kp0CCY\\\";\" }"); + clusterDO.setStatus(1); + clusterDO.setGmtCreate(new Date()); + clusterDO.setGmtModify(new Date()); + return clusterDO; + } + + private TopicDataSampleDTO getTopicDataSampleDTO() { + TopicDataSampleDTO topicDataSampleDTO = new TopicDataSampleDTO(); + topicDataSampleDTO.setPartitionId(0); + topicDataSampleDTO.setOffset(0L); + topicDataSampleDTO.setTimeout(5000); + topicDataSampleDTO.setTruncate(true); + topicDataSampleDTO.setMaxMsgNum(90); + return topicDataSampleDTO; + } + + private TopicMetrics getTopicMetrics() { + TopicMetrics topicMetrics = new TopicMetrics(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK); + String metrics = "{\"TotalFetchRequestsPerSecFiveMinuteRate\":4.132236103122026,\"BytesRejectedPerSecFiveMinuteRate\":0.0,\"TotalFetchRequestsPerSecFifteenMinuteRate\":1.5799208507558833,\"ProduceTotalTimeMs98thPercentile\":0.0,\"MessagesInPerSecMeanRate\":0.0,\"ProduceTotalTimeMs75thPercentile\":0.0,\"ProduceTotalTimeMs99thPercentile\":0.0,\"TotalProduceRequestsPerSecOneMinuteRate\":0.0,\"FailedProduceRequestsPerSecFifteenMinuteRate\":0.0,\"BytesInPerSecMeanRate\":0.0,\"TotalProduceRequestsPerSecFiveMinuteRate\":0.0,\"FetchConsumerTotalTimeMs999thPercentile\":0.0,\"FetchConsumerTotalTimeMs98thPercentile\":0.0,\"FetchConsumerTotalTimeMsMean\":0.0,\"FetchConsumerTotalTimeMs99thPercentile\":0.0,\"FailedFetchRequestsPerSecFifteenMinuteRate\":0.0,\"MessagesInPerSecFiveMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentOneMinuteRate\":0.999221766772746,\"ProduceTotalTimeMsMean\":0.0,\"BytesInPerSecFiveMinuteRate\":0.0,\"FailedProduceRequestsPerSecMeanRate\":0.0,\"FailedFetchRequestsPerSecMeanRate\":0.0,\"FailedProduceRequestsPerSecFiveMinuteRate\":0.0,\"BytesOutPerSecFifteenMinuteRate\":0.0,\"BytesInPerSecOneMinuteRate\":0.0,\"BytesOutPerSecFiveMinuteRate\":0.0,\"HealthScore\":90,\"FailedFetchRequestsPerSecOneMinuteRate\":0.0,\"MessagesInPerSecOneMinuteRate\":0.0,\"BytesRejectedPerSecFifteenMinuteRate\":0.0,\"FailedFetchRequestsPerSecFiveMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentFiveMinuteRate\":0.999803118809842,\"BytesOutPerSecOneMinuteRate\":0.0,\"ResponseQueueSizeValue\":0,\"MessagesInPerSecFifteenMinuteRate\":0.0,\"TotalProduceRequestsPerSecMeanRate\":0.0,\"BytesRejectedPerSecMeanRate\":0.0,\"TotalFetchRequestsPerSecMeanRate\":1.2674449706628523,\"NetworkProcessorAvgIdlePercentValue\":1.0,\"TotalFetchRequestsPerSecOneMinuteRate\":10.457259856316893,\"BytesInPerSecFifteenMinuteRate\":0.0,\"BytesOutPerSecMeanRate\":0.0,\"TotalProduceRequestsPerSecFifteenMinuteRate\":0.0,\"FetchConsumerTotalTimeMs50thPercentile\":0.0,\"RequestHandlerAvgIdlePercentFifteenMinuteRate\":0.9999287809186348,\"FetchConsumerTotalTimeMs95thPercentile\":0.0,\"FailedProduceRequestsPerSecOneMinuteRate\":0.0,\"CreateTime\":1638792321071,\"FetchConsumerTotalTimeMs75thPercentile\":0.0,\"ProduceTotalTimeMs999thPercentile\":0.0,\"RequestQueueSizeValue\":0,\"ProduceTotalTimeMs50thPercentile\":0.0,\"BytesRejectedPerSecOneMinuteRate\":0.0,\"RequestHandlerAvgIdlePercentMeanRate\":0.9999649184090593,\"ProduceTotalTimeMs95thPercentile\":0.0}"; + JSONObject jsonObject = JSON.parseObject(metrics); + Map metricsMap = new HashMap<>(); + for (Map.Entry stringObjectEntry : jsonObject.entrySet()) { + metricsMap.put(stringObjectEntry.getKey(), stringObjectEntry.getValue()); + } + topicMetrics.setMetricsMap(metricsMap); + return topicMetrics; + } + + @Test(description = "测试从DB获取监控数据") + public void getTopicMetricsFromDBTest() { + List list = topicService.getTopicMetricsFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, new Date(0L), new Date()); + Assert.assertFalse(list.isEmpty()); + Assert.assertTrue(list.stream().allMatch(topicMetricsDO -> + topicMetricsDO.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topicMetricsDO.getTopicName().equals(REAL_TOPIC1_IN_ZK))); + } + + @Test + public void getTopicMetricsFromDBWithAppIdTest() { + List list = topicService.getTopicMetricsFromDB("1", REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, new Date(0L), new Date()); + Assert.assertFalse(list.isEmpty()); + } + + @Test(description = "测试获取指定时间段内的峰值的均值流量") + public void getMaxAvgBytesInFromDBTest() { + // 为空 + getMaxAvgBytesInFromDB2NullTest(); + // 获取成功 + getMaxAvgBytesInFromDB2SuccessTest(); + } + + private void getMaxAvgBytesInFromDB2NullTest() { + Double result = topicService.getMaxAvgBytesInFromDB(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC, new Date(0L), new Date()); + Assert.assertNull(result); + } + + private void getMaxAvgBytesInFromDB2SuccessTest() { + Double result = topicService.getMaxAvgBytesInFromDB(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, new Date(0L), new Date()); + Assert.assertNotNull(result); + } + + @Test(description = "获取brokerId下所有的Topic及其对应的PartitionId") + public void getTopicPartitionIdMapTest() { + Map> topicPartitionIdMap = topicService.getTopicPartitionIdMap(REAL_CLUSTER_ID_IN_MYSQL, 1); + Assert.assertFalse(topicPartitionIdMap.isEmpty()); + Assert.assertTrue(topicPartitionIdMap.containsKey(REAL_TOPIC1_IN_ZK)); + } + + @Test(description = "测试获取 Topic 的 basic-info 信息") + public void getTopicBasicDTOTest() { + // TopicMetadata is Null + getTopicBasicDTO2TopicMetadataIsNull(); + // TopicDO is Null + getTopicBasicDTO2TopicMetadata2TopicDOIsNull(); + // TopicDO is not Null + getTopicBasicDTO2TopicMetadata2TopicDOIsNotNull(); + } + + private void getTopicBasicDTO2TopicMetadataIsNull() { + TopicBasicDTO result = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC); + Assert.assertEquals(result.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL)); + Assert.assertEquals(result.getTopicName(), INVALID_TOPIC); + Assert.assertNull(result.getAppId()); + } + + private void getTopicBasicDTO2TopicMetadata2TopicDOIsNull() { + Mockito.when(topicManagerService.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(null); + TopicBasicDTO result = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK); + Assert.assertNotNull(result); + Assert.assertEquals(result.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL)); + Assert.assertEquals(result.getTopicName(), REAL_TOPIC1_IN_ZK); + Assert.assertNull(result.getDescription()); + Assert.assertNull(result.getAppId()); + } + + private void getTopicBasicDTO2TopicMetadata2TopicDOIsNotNull() { + TopicDO topicDO = getTopicDO(); + Mockito.when(topicManagerService.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(null); + TopicBasicDTO result = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK); + Assert.assertNotNull(result); + Assert.assertEquals(result.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL)); + Assert.assertEquals(result.getTopicName(), REAL_TOPIC1_IN_ZK); + Assert.assertEquals(result.getDescription(), topicDO.getDescription()); + // appId不存在 + Assert.assertNull(result.getAppId()); + // appId存在 + topicDO.setAppId("moduleTestAppId"); + Mockito.when(topicManagerService.getByTopicName(Mockito.anyLong(), Mockito.anyString())).thenReturn(topicDO); + Mockito.when(appService.getByAppId(Mockito.anyString())).thenReturn(getAppDO()); + TopicBasicDTO result2 = topicService.getTopicBasicDTO(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK); + Assert.assertNotNull(result2); + Assert.assertEquals(result2.getClusterId(), Long.valueOf(REAL_CLUSTER_ID_IN_MYSQL)); + Assert.assertEquals(result2.getTopicName(), REAL_TOPIC1_IN_ZK); + Assert.assertEquals(result2.getDescription(), topicDO.getDescription()); + Assert.assertEquals(result2.getAppId(), topicDO.getAppId()); + } + + @Test(description = "获取Topic的PartitionState信息") + public void getTopicPartitionDTOTest() { + // result is emptyList + getTopicPartitionDTO2EmptyTest(); + // needDetail is false + getTopicPartitionDTO2NeedDetailFalseTest(); + // needDetail is true + getTopicPartitionDTO2NeedDetailTrueTest(); + } + + private void getTopicPartitionDTO2EmptyTest() { + List list = topicService.getTopicPartitionDTO(null, null, true); + Assert.assertTrue(list.isEmpty()); + + ClusterDO clusterDO = new ClusterDO(); + clusterDO.setId(REAL_CLUSTER_ID_IN_MYSQL); + List list2 = topicService.getTopicPartitionDTO(clusterDO, INVALID_TOPIC, true); + Assert.assertTrue(list2.isEmpty()); + } + + private void getTopicPartitionDTO2NeedDetailFalseTest() { + Map map = new HashMap<>(); + PartitionAttributeDTO partitionAttributeDTO1 = new PartitionAttributeDTO(); + partitionAttributeDTO1.setLogSize(0L); + map.put(0, partitionAttributeDTO1); + map.put(1, null); + Mockito.when(jmxService.getPartitionAttribute( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyList())).thenReturn(map); + + ClusterDO clusterDO = getClusterDO(); + List list = topicService.getTopicPartitionDTO(clusterDO, REAL_TOPIC1_IN_ZK, false); + Assert.assertFalse(list.isEmpty()); + Assert.assertEquals(list.size(), 2); + Assert.assertTrue(list.stream().allMatch(topicPartitionDTO -> + topicPartitionDTO.getBeginningOffset() == null && + topicPartitionDTO.getEndOffset() == null)); + } + + private void getTopicPartitionDTO2NeedDetailTrueTest() { + Map map = new HashMap<>(); + PartitionAttributeDTO partitionAttributeDTO1 = new PartitionAttributeDTO(); + partitionAttributeDTO1.setLogSize(0L); + map.put(0, partitionAttributeDTO1); + map.put(1, null); + Mockito.when(jmxService.getPartitionAttribute( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyList())).thenReturn(map); + + ClusterDO clusterDO = getClusterDO(); + List list = topicService.getTopicPartitionDTO(clusterDO, REAL_TOPIC1_IN_ZK, true); + Assert.assertFalse(list.isEmpty()); + Assert.assertEquals(list.size(), 2); + Assert.assertTrue(list.stream().allMatch(topicPartitionDTO -> + topicPartitionDTO.getBeginningOffset() != null && + topicPartitionDTO.getEndOffset() != null)); + } + + @Test + public void getTopicMetricsFromJMXTest() { + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).thenReturn(new TopicMetrics(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK)); + BaseMetrics result = topicService.getTopicMetricsFromJMX(REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC1_IN_ZK, 200, true); + Assert.assertNotNull(result); + } + + @Test(description = "测试获取Topic的分区的offset") + public void getPartitionOffsetTest() { + // 结果为空 + getPartitionOffset2EmptyTest(); + // 获取成功 + getPartitionOffset2SuccessTest(); + } + + private void getPartitionOffset2EmptyTest() { + ClusterDO clusterDO = getClusterDO(); + Map partitionOffset = topicService.getPartitionOffset( + null, null, OffsetPosEnum.BEGINNING); + Assert.assertTrue(partitionOffset.isEmpty()); + + Map partitionOffset2 = topicService.getPartitionOffset( + clusterDO, INVALID_TOPIC, OffsetPosEnum.BEGINNING); + Assert.assertTrue(partitionOffset2.isEmpty()); + } + + private void getPartitionOffset2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + // 获取beginning offset + Map partitionOffset1 = topicService.getPartitionOffset( + clusterDO, REAL_TOPIC1_IN_ZK, OffsetPosEnum.BEGINNING); + Assert.assertFalse(partitionOffset1.isEmpty()); + // 获取end offset + Map partitionOffset2 = topicService.getPartitionOffset( + clusterDO, REAL_TOPIC1_IN_ZK, OffsetPosEnum.END); + Assert.assertFalse(partitionOffset2.isEmpty()); + } + + @Test(description = "测试获取Topic概览信息,参数clusterId, brokerId") + public void getTopicOverviewListTest() { + // 结果为空 + getTopicOverviewList2EmptyTest(); + // 获取成功 + getTopicOverviewList2SuccessTest(); + } + + private void getTopicOverviewList2EmptyTest() { + List topicOverviewList = topicService.getTopicOverviewList(null, 1); + Assert.assertTrue(topicOverviewList.isEmpty()); + } + + private void getTopicOverviewList2SuccessTest() { + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, 1); + Assert.assertFalse(topicOverviewList.isEmpty()); + } + + @Test(description = "测试获取Topic概览信息,参数clusterId, topicNameList") + public void getTopicOverviewListWithTopicList() { + // 结果为空 + getTopicOverviewListWithTopicList2EmptyTest(); + + // topicDOList is null,appDOList is null, metrics is null + getTopicOverviewListWithTopicList2TopicAndApp1Test(); + + // topicDOList is null,appDOList is null, metrics is not null + getTopicOverviewListWithTopicList2TopicAndApp2Test(); + + // topicDOList is null,appDOList is not null, metrics is null + getTopicOverviewListWithTopicList2TopicAndApp3Test(); + + // topicDOList is null,appDOList is not null, metrics is not null + getTopicOverviewListWithTopicList2TopicAndApp4Test(); + + // topicDOList is not null,appDOList is null, metrics is null + getTopicOverviewListWithTopicList2TopicAndApp5Test(); + + // topicDOList is not null,appDOList is null, metrics is not null + getTopicOverviewListWithTopicList2TopicAndApp6Test(); + + // topicDOList is not null,appDOList is not null, metrics is null + getTopicOverviewListWithTopicList2TopicAndApp7Test(); + + // topicDOList is not null,appDOList is not null, metrics is not null + getTopicOverviewListWithTopicList2TopicAndApp8Test(); + + } + + private void getTopicOverviewListWithTopicList2EmptyTest() { + List topicOverviewList = topicService.getTopicOverviewList(null, Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC)); + Assert.assertTrue(topicOverviewList.isEmpty()); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp1Test() { + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null); + Mockito.when(appService.listAll()).thenReturn(null); + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).thenReturn(null); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId() == null && + topicOverview.getAppName() == null && + topicOverview.getByteIn() == null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp2Test() { + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null); + Mockito.when(appService.listAll()).thenReturn(null); + TopicMetrics topicMetrics = getTopicMetrics(); + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). + thenReturn(topicMetrics); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId() == null && + topicOverview.getAppName() == null && + topicOverview.getByteIn() != null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp3Test() { + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null); + AppDO appDO = getAppDO(); + Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO)); + + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). + thenReturn(null); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId() == null && + topicOverview.getAppName() == null && + topicOverview.getByteIn() == null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp4Test() { + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(null); + AppDO appDO = getAppDO(); + Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO)); + + TopicMetrics topicMetrics = getTopicMetrics(); + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). + thenReturn(topicMetrics); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId() == null && + topicOverview.getAppName() == null && + topicOverview.getByteIn() != null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp5Test() { + TopicDO topicDO = getTopicDO(); + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO)); + Mockito.when(appService.listAll()).thenReturn(null); + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())).thenReturn(null); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId().equals(topicDO.getAppId()) && + topicOverview.getAppName() == null && + topicOverview.getByteIn() == null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp6Test() { + TopicDO topicDO = getTopicDO(); + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO)); + Mockito.when(appService.listAll()).thenReturn(null); + TopicMetrics topicMetrics = getTopicMetrics(); + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). + thenReturn(topicMetrics); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId().equals(topicDO.getAppId()) && + topicOverview.getAppName() == null && + topicOverview.getByteIn() != null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp7Test() { + TopicDO topicDO = getTopicDO(); + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO)); + AppDO appDO = getAppDO(); + Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO)); + + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). + thenReturn(null); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId().equals(topicDO.getAppId()) && + topicOverview.getAppName().equals(appDO.getName()) && + topicOverview.getByteIn() == null)); + } + + private void getTopicOverviewListWithTopicList2TopicAndApp8Test() { + TopicDO topicDO = getTopicDO(); + Mockito.when(topicManagerService.getByClusterIdFromCache(Mockito.anyLong())).thenReturn(Arrays.asList(topicDO)); + AppDO appDO = getAppDO(); + Mockito.when(appService.listAll()).thenReturn(Arrays.asList(appDO)); + + TopicMetrics topicMetrics = getTopicMetrics(); + Mockito.when(jmxService.getTopicMetrics( + Mockito.anyLong(), Mockito.anyString(), Mockito.anyInt(), Mockito.anyBoolean())). + thenReturn(topicMetrics); + + List topics = Arrays.asList(REAL_TOPIC1_IN_ZK, ZK_DEFAULT_TOPIC, INVALID_TOPIC); + List topicOverviewList = topicService.getTopicOverviewList(REAL_CLUSTER_ID_IN_MYSQL, topics); + Assert.assertFalse(topicOverviewList.isEmpty()); + Assert.assertTrue(topicOverviewList.stream().allMatch(topicOverview -> + topicOverview.getClusterId().equals(REAL_CLUSTER_ID_IN_MYSQL) && + topics.contains(topicOverview.getTopicName()) && + topicOverview.getAppId().equals(topicDO.getAppId()) && + topicOverview.getAppName().equals(appDO.getName()) && + topicOverview.getByteIn() != null)); + } + + @Test(description = "测试获取指定时间的offset信息") + public void getPartitionOffsetListTest() { + // 结果为空 + getPartitionOffsetList2EmptyTest(); + // 获取成功 + getPartitionOffsetList2SuccessTest(); + } + + private void getPartitionOffsetList2EmptyTest() { + ClusterDO clusterDO = getClusterDO(); + List list = topicService.getPartitionOffsetList(clusterDO, INVALID_TOPIC, 0L); + Assert.assertTrue(list.isEmpty()); + } + + private void getPartitionOffsetList2SuccessTest() { + ClusterDO clusterDO = getClusterDO(); + List list = topicService.getPartitionOffsetList(clusterDO, REAL_TOPIC1_IN_ZK, 0L); + Assert.assertFalse(list.isEmpty()); + } + + @Test() + public void getTopicPartitionStateTest() { + // 结果为空 + getTopicPartitionState2EmptyTest(); + + // 获取结果成功 + getTopicPartitionState2SuccessTest(); + } + + private void getTopicPartitionState2EmptyTest() { + Map> map1 = + topicService.getTopicPartitionState(null, REAL_BROKER_ID_IN_ZK); + Assert.assertTrue(map1.isEmpty()); + + Map> map2 = + topicService.getTopicPartitionState(INVALID_CLUSTER_ID, REAL_BROKER_ID_IN_ZK); + Assert.assertTrue(map2.isEmpty()); + } + + /** + * 共有三个topic, REAL_TOPIC1_IN_ZK, REAL_TOPIC2_IN_ZK, ZK_DEFAULT_TOPIC + */ + private void getTopicPartitionState2SuccessTest() { + Map> map1 = + topicService.getTopicPartitionState(REAL_CLUSTER_ID_IN_MYSQL, REAL_BROKER_ID_IN_ZK); + Assert.assertFalse(map1.isEmpty()); + } + + @Test(description = "测试数据采样") + public void fetchTopicDataTest() { + // invalid partitionId + fetchTopicData2InvalidPartitionId(); + // 指定了offset,截断 + fetchTopicData2OffsetAndTruncate(); + // 指定了offset,未截断 + fetchTopicData2OffsetAndNoTruncate(); + // 未指定offset, 返回空 + fetchTopicData2NoOffset2Empty(); + // 未指定offset,截断 + fetchTopicData2NoOffsetAndTruncate(); + // 未指定offset,未截断 + fetchTopicData2NoOffsetAndNoTruncate(); + } + + private void fetchTopicData2InvalidPartitionId() { + ClusterDO clusterDO = getClusterDO(); + TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO(); + topicDataSampleDTO.setPartitionId(INVALID_PARTITION_ID); + List result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO); + Assert.assertTrue(result.isEmpty()); + } + + private void fetchTopicData2NoOffsetAndTruncate() { + ClusterDO clusterDO = getClusterDO(); + TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO(); + topicDataSampleDTO.setOffset(null); + List result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO); + Assert.assertFalse(result.isEmpty()); + Assert.assertTrue(result.stream().allMatch( + value -> value.length() <= TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE)); + } + + private void fetchTopicData2NoOffsetAndNoTruncate() { + ClusterDO clusterDO = getClusterDO(); + TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO(); + topicDataSampleDTO.setOffset(null); + topicDataSampleDTO.setTruncate(false); + List result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO); + Assert.assertFalse(result.isEmpty()); + Assert.assertTrue(result.stream().allMatch( + value -> value.length() > TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE)); + } + + private void fetchTopicData2OffsetAndTruncate() { + ClusterDO clusterDO = getClusterDO(); + TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO(); + List result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO); + Assert.assertFalse(result.isEmpty()); + Assert.assertTrue(result.stream().allMatch( + value -> value.length() <= TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE)); + } + + private void fetchTopicData2OffsetAndNoTruncate() { + ClusterDO clusterDO = getClusterDO(); + TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO(); + topicDataSampleDTO.setTruncate(false); + List result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO); + Assert.assertFalse(result.isEmpty()); + Assert.assertTrue(result.stream().allMatch( + value -> value.length() > TopicSampleConstant.MAX_DATA_LENGTH_UNIT_BYTE)); + } + + private void fetchTopicData2NoOffset2Empty() { + ClusterDO clusterDO = getClusterDO(); + TopicDataSampleDTO topicDataSampleDTO = getTopicDataSampleDTO(); + topicDataSampleDTO.setOffset(null); + topicDataSampleDTO.setTimeout(-1); + List result = topicService.fetchTopicData(clusterDO, REAL_TOPIC1_IN_ZK, topicDataSampleDTO); + Assert.assertTrue(result.isEmpty()); + } + + @Test(description = "测试从数据库中获取requestMetrics指标") + public void getTopicRequestMetricsFromDBTest() { + TopicMetricsDO topicMetricsDO1 = getTopicMetricsDO(); + topicRequestMetricsDao.add(topicMetricsDO1); + + Date startTime = new Date(0L); + Date endTime = new Date(); + List result = topicService.getTopicRequestMetricsFromDB( + topicMetricsDO1.getClusterId(), topicMetricsDO1.getTopicName(), startTime, endTime); + Assert.assertFalse(result.isEmpty()); + Assert.assertTrue(result.stream().allMatch(topicMetricsDO -> + topicMetricsDO.getClusterId().equals(topicMetricsDO1.getClusterId()) && + topicMetricsDO.getTopicName().equals(topicMetricsDO1.getTopicName()) && + topicMetricsDO.getGmtCreate().after(startTime) && + topicMetricsDO.getGmtCreate().before(endTime))); + } + + @Test(description = "测试获取topic的broker列表") + public void getTopicBrokerListTest() { + List topicBrokerList = topicService.getTopicBrokerList( + REAL_CLUSTER_ID_IN_MYSQL, REAL_TOPIC2_IN_ZK); + Assert.assertFalse(topicBrokerList.isEmpty()); + } + + @Test(description = "测试topic是否有数据写入") + public void checkTopicOffsetChangedTest() { + // physicalCluster does not exist + checkTopicOffsetChanged2ClusterNotExistTest(); + // endOffsetMap is empty + checkTopicOffsetChanged2UnknownTest(); + // dtoList is not empty and result is Yes + checkTopicOffsetChanged2dtoListNotNullAndYesTest(); + // dtoList is empty and result is No + checkTopicOffsetChanged2NoTest(); + } + + private void checkTopicOffsetChanged2ClusterNotExistTest() { + Result result = + topicService.checkTopicOffsetChanged(INVALID_CLUSTER_ID, REAL_TOPIC1_IN_ZK, 0L); + Assert.assertEquals(result.getCode(), ResultStatus.CLUSTER_NOT_EXIST.getCode()); + } + + private void checkTopicOffsetChanged2UnknownTest() { + Result result = + topicService.checkTopicOffsetChanged(REAL_CLUSTER_ID_IN_MYSQL, INVALID_TOPIC, 0L); + Assert.assertEquals(result.getData().getCode(), TopicOffsetChangedEnum.UNKNOWN.getCode()); + } + + private void checkTopicOffsetChanged2dtoListNotNullAndYesTest() { + Result result = topicService.checkTopicOffsetChanged( + REAL_CLUSTER_ID_IN_MYSQL, + REAL_TOPIC1_IN_ZK, + System.currentTimeMillis()); + Assert.assertNotNull(result); + Assert.assertEquals(result.getData().getCode(), TopicOffsetChangedEnum.YES.getCode()); + } + + private void checkTopicOffsetChanged2NoTest() { + Result result = topicService.checkTopicOffsetChanged( + REAL_CLUSTER_ID_IN_MYSQL, + NO_OFFSET_CHANGE_TOPIC_IN_ZK, + System.currentTimeMillis()); + Assert.assertNotNull(result); + Assert.assertEquals(result.getData().getCode(), TopicOffsetChangedEnum.NO.getCode()); + } + +}