diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/KnowStreamApplicationTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/KnowStreamApplicationTest.java
index e8ce0d27..97556191 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/KnowStreamApplicationTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/KnowStreamApplicationTest.java
@@ -1,13 +1,10 @@
package com.xiaojukeji.know.streaming.km;
import com.xiaojukeji.know.streaming.km.rest.KnowStreaming;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import com.xiaojukeji.know.streaming.test.km.KMBase;
import org.junit.jupiter.api.extension.ExtendWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.web.server.LocalServerPort;
-import org.springframework.http.HttpHeaders;
import org.springframework.test.context.ActiveProfiles;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit.jupiter.SpringExtension;
@@ -15,32 +12,19 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
/**
* @author d06679
* @date 2019/4/11
- *
+ *
* 得使用随机端口号,这样行执行单元测试的时候,不会出现端口号占用的情况
*/
@ActiveProfiles("test")
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = KnowStreaming.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
-public class KnowStreamApplicationTest {
-
- protected HttpHeaders headers;
-
+public class KnowStreamApplicationTest extends KMBase {
@LocalServerPort
private Integer port;
- @BeforeEach
- public void setUp() {
- // 获取 springboot server 监听的端口号
- // port = applicationContext.getWebServer().getPort();
- System.out.println( String.format("port is : [%d]", port));
-//
-// headers = new HttpHeaders();
-// headers.add("X-SSO-USER", "zengqiao");
- }
-
- @Test
- public void test() {
- Assertions.assertNotNull(port);
- }
+// @Test
+// public void test() {
+// Assertions.assertNotNull(port);
+// }
}
\ No newline at end of file
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/TopicMetricServiceTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/TopicMetricServiceTest.java
index 36ad96c3..2a2c34a9 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/TopicMetricServiceTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/TopicMetricServiceTest.java
@@ -18,12 +18,13 @@ import java.util.List;
public class TopicMetricServiceTest extends KnowStreamApplicationTest {
+ Long clusterId = 1L;
+
@Autowired
private TopicMetricService topicMetricService;
@Test
public void listTopicMetricsFromESTest(){
- Long clusterId = 1l;
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 3600 * 1000;
@@ -47,7 +48,6 @@ public class TopicMetricServiceTest extends KnowStreamApplicationTest {
@Test
public void pagingTopicWithLatestMetricsFromESTest(){
- Long clusterId = 2l;
List metricNameList = new ArrayList<>();
SearchSort sort = new SearchSort();
sort.setQueryName("LogSize");
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyServiceTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyServiceTest.java
new file mode 100644
index 00000000..14f07072
--- /dev/null
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/core/service/cluster/ClusterPhyServiceTest.java
@@ -0,0 +1,54 @@
+package com.xiaojukeji.know.streaming.km.core.service.cluster;
+
+import com.xiaojukeji.know.streaming.km.KnowStreamApplicationTest;
+import com.xiaojukeji.know.streaming.km.common.bean.dto.cluster.ClusterPhyAddDTO;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.cluster.ClusterPhy;
+import com.xiaojukeji.know.streaming.km.common.bean.entity.config.JmxConfig;
+import com.xiaojukeji.know.streaming.km.common.converter.ClusterConverter;
+import com.xiaojukeji.know.streaming.km.common.exception.AdminOperateException;
+import com.xiaojukeji.know.streaming.km.common.exception.DuplicateException;
+import com.xiaojukeji.know.streaming.km.common.exception.ParamErrorException;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import java.util.List;
+import java.util.Properties;
+
+@Slf4j
+public class ClusterPhyServiceTest extends KnowStreamApplicationTest {
+ @Autowired
+ private ClusterPhyService clusterPhyService;
+
+ @Test
+ @Order(Integer.MIN_VALUE)
+ void addClusterPhyTest() {
+ try {
+ Properties properties = new Properties();
+ JmxConfig jmxConfig = new JmxConfig();
+ jmxConfig.setOpenSSL(false);
+
+ ClusterPhyAddDTO dto = new ClusterPhyAddDTO();
+ dto.setName("test");
+ dto.setDescription("");
+ dto.setKafkaVersion(kafkaVersion());
+ dto.setJmxProperties(jmxConfig);
+ dto.setClientProperties(properties);
+ dto.setZookeeper(zookeeperUrl());
+ dto.setBootstrapServers(bootstrapServers());
+ Assertions.assertEquals(1,
+ clusterPhyService.addClusterPhy(ClusterConverter.convert2ClusterPhyPO(dto), "root"));
+ } catch (ParamErrorException | DuplicateException | AdminOperateException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Test
+ void listAllClustersTest() {
+ List clusterPhies = clusterPhyService.listAllClusters();
+ Assertions.assertNotNull(clusterPhies);
+ log.info("集群列表:{}", clusterPhies);
+ }
+}
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/BrokerMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/BrokerMetricESDAOTest.java
index f08a229a..fdb574ab 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/BrokerMetricESDAOTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/BrokerMetricESDAOTest.java
@@ -6,6 +6,7 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchRange;
import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.BrokerMetricESDAO;
+import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
@@ -14,8 +15,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+@Slf4j
public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
+ Long clusterId = 1L;
+
@Autowired
private BrokerMetricESDAO brokerMetriceESDAO;
@@ -25,7 +29,7 @@ public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
SearchSort def = new SearchSort("timestamp", true);
String sortDsl = brokerMetriceESDAO.buildSortDsl(sort, def);
- System.out.println(sortDsl);
+ log.info(sortDsl);
}
@Test
@@ -33,7 +37,7 @@ public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
SearchRange sort = new SearchRange("age", 1232321f, 45345345345f);
String sortDsl = brokerMetriceESDAO.buildRangeDsl(sort);
- System.out.println(sortDsl);
+ log.info(sortDsl);
}
@Test
@@ -44,12 +48,11 @@ public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
String matchDsl = brokerMetriceESDAO.buildMatchDsl(matches);
- System.out.println(matchDsl);
+ log.info(matchDsl);
}
@Test
public void getBrokerMetricsPointTest(){
- Long clusterId = 2L;
Integer brokerId = 1;
List metrics = Arrays.asList("BytesIn", "BytesIn_min_5");
Long endTime = System.currentTimeMillis();
@@ -63,7 +66,6 @@ public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listBrokerMetricesByBrokerIdsTest(){
- Long clusterId = 123L;
List metrics = Arrays.asList("BytesInPerSec_min_1", "BytesInPerSec_min_15");
List brokerIds = Arrays.asList(1L);
Long endTime = System.currentTimeMillis();
@@ -74,7 +76,6 @@ public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listBrokerMetricsByTopTest(){
- Long clusterId = 123L;
List metrics = Arrays.asList("BytesInPerSec_min_1", "BytesInPerSec_min_15");
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 4 * 60 * 60 * 1000;
@@ -84,7 +85,6 @@ public class BrokerMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void getTopBrokerIdsTest(){
- Long clusterId = 123L;
List metrics = Arrays.asList("BytesInPerSec_min_1", "BytesInPerSec_min_15");
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 4 * 60 * 60 * 1000;
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java
index d0f96bff..46e3d31a 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/ClusterMetricESDAOTest.java
@@ -15,6 +15,8 @@ import java.util.*;
public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
+ Long clusterId = 1L;
+
@Autowired
private ClusterMetricESDAO clusterMetricESDAO;
@@ -34,7 +36,6 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
*/
@Test
public void getClusterMetricsPointTest(){
- Long clusterId = 1L;
List metrics = Arrays.asList(
"Connections", "BytesIn_min_15", "PartitionURP",
"HealthScore_Topics", "EventQueueSize", "ActiveControllerCount",
@@ -67,7 +68,6 @@ public class ClusterMetricESDAOTest extends KnowStreamApplicationTest {
*/
@Test
public void getClusterLatestMetricsTest(){
- Long clusterId = 1L;
List metrics = Collections.emptyList();
ClusterMetricPO clusterLatestMetrics = clusterMetricESDAO.getClusterLatestMetrics(clusterId, metrics);
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/GroupMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/GroupMetricESDAOTest.java
index bc6d5821..4d0c5654 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/GroupMetricESDAOTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/GroupMetricESDAOTest.java
@@ -20,12 +20,13 @@ import java.util.Set;
public class GroupMetricESDAOTest extends KnowStreamApplicationTest {
+ Long clusterId = 1L;
+
@Autowired
private GroupMetricESDAO groupMetricESDAO;
@Test
public void listLatestMetricsAggByGroupTopicTest(){
- Long clusterPhyId = 2L;
List groupTopicList = new ArrayList<>();
groupTopicList.add(new GroupTopic("g-know-streaming-123456", "know-streaming-test-251"));
groupTopicList.add(new GroupTopic("test_group", "know-streaming-test-251"));
@@ -33,14 +34,13 @@ public class GroupMetricESDAOTest extends KnowStreamApplicationTest {
List metrics = Arrays.asList("OffsetConsumed", "Lag");
AggTypeEnum aggType = AggTypeEnum.AVG;
- List groupMetricPOS = groupMetricESDAO.listLatestMetricsAggByGroupTopic(clusterPhyId, groupTopicList, metrics, aggType);
+ List groupMetricPOS = groupMetricESDAO.listLatestMetricsAggByGroupTopic(clusterId, groupTopicList, metrics, aggType);
assert !CollectionUtils.isEmpty(groupMetricPOS);
}
@Test
public void listGroupTopicPartitionsTest(){
- Long clusterId = 2L;
String groupName = "g-know-streaming-123456";
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 24 * 3600 * 1000;
@@ -51,17 +51,15 @@ public class GroupMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listPartitionLatestMetricsTest(){
- Long clusterId = 2L;
String groupName = "test_group_20220421";
String topicName = "know-streaming-test-251";
List groupMetricPOS = groupMetricESDAO.listPartitionLatestMetrics(clusterId, groupName, topicName, null);
- assert !CollectionUtils.isEmpty(groupMetricPOS);
+ assert CollectionUtils.isEmpty(groupMetricPOS);
}
@Test
public void countMetricValueTest(){
- Long clusterId = 3L;
String groupName = "test_group";
SearchTerm searchTerm = new SearchTerm("HealthCheckTotal", "1", false);
@@ -75,7 +73,6 @@ public class GroupMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listGroupMetricsTest(){
- Long clusterId = 2L;
String groupName = "g-know-streaming-123456";
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 24 * 3600 * 1000;
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/PartitionMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/PartitionMetricESDAOTest.java
index 5721cbf0..9c8ac21b 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/PartitionMetricESDAOTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/PartitionMetricESDAOTest.java
@@ -11,16 +11,17 @@ import java.util.List;
public class PartitionMetricESDAOTest extends KnowStreamApplicationTest {
+ Long clusterId = 1L;
+
@Autowired
private PartitionMetricESDAO partitionMetricESDAO;
@Test
public void listPartitionLatestMetricsByTopicTest(){
- Long clusterPhyId = 2L;
String topic = "__consumer_offsets";
List partitionMetricPOS = partitionMetricESDAO.listPartitionLatestMetricsByTopic(
- clusterPhyId, topic, new ArrayList<>());
+ clusterId, topic, new ArrayList<>());
assert null != partitionMetricPOS;
}
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java
index 09db0971..90ef291a 100644
--- a/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/km/persistence/es/TopicMetricESDAOTest.java
@@ -8,22 +8,25 @@ import com.xiaojukeji.know.streaming.km.common.bean.entity.search.SearchSort;
import com.xiaojukeji.know.streaming.km.common.bean.po.metrice.TopicMetricPO;
import com.xiaojukeji.know.streaming.km.common.bean.vo.metrics.point.MetricPointVO;
import com.xiaojukeji.know.streaming.km.persistence.es.dao.TopicMetricESDAO;
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+@Slf4j
public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
+ Long clusterId = 1L;
+
@Autowired
private TopicMetricESDAO topicMetricESDAO;
@Test
public void listTopicMaxMinMetricsTest(){
- Long clusterId = 2L;
String topic = "know-streaming-test-251";
String topic1 = "topic_test01";
Long endTime = System.currentTimeMillis();
@@ -36,7 +39,6 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void getTopicsAggsMetricsValueTest(){
- Long clusterId = 2L;
List topicList = Arrays.asList("know-streaming-test-251", "topic_test01");
List metrics = Arrays.asList(
"Messages", "BytesIn_min_15", "BytesRejected",
@@ -56,7 +58,6 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listTopicWithLatestMetricsTest(){
- Long clusterId = 2L;
SearchSort sort = new SearchSort("LogSize", true);
sort.setMetric(true);
@@ -65,12 +66,11 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
List topicMetricPOS = topicMetricESDAO.listTopicWithLatestMetrics(clusterId, sort, fuzzy, null, terms);
- assert !CollectionUtils.isEmpty(topicMetricPOS);
+ log.info("{}", topicMetricPOS);
}
@Test
public void getTopicLatestMetricByBrokerIdTest(){
- Long clusterId = 2L;
String topic = "know-streaming-test-251";
Integer brokerId = 1;
@@ -81,7 +81,6 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void getTopicLatestMetricTest(){
- Long clusterId = 2L;
String topic = "know-streaming-test-251";
TopicMetricPO topicMetricPO = topicMetricESDAO.getTopicLatestMetric(clusterId, topic, new ArrayList<>());
@@ -91,7 +90,6 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listTopicLatestMetricTest(){
- Long clusterId = 2L;
String topic = "know-streaming-test-251";
String topic1 = "know-streaming-123";
String topic2 = "1209test";
@@ -112,7 +110,6 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
@Test
public void listBrokerMetricsByTopicsTest(){
- Long clusterId = 2L;
List metrics = Arrays.asList(
"Messages", "BytesIn_min_15", "BytesRejected",
"PartitionURP", "HealthCheckTotal", "ReplicationCount",
@@ -125,12 +122,13 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 4 * 60 * 60 * 1000;
- topicMetricESDAO.listTopicMetricsByTopics(clusterId, metrics, "avg", topics, startTime, endTime);
+ Table> list =
+ topicMetricESDAO.listTopicMetricsByTopics(clusterId, metrics, "avg", topics, startTime, endTime);
+ Assertions.assertNotNull(list);
}
@Test
public void countMetricValueOccurrencesTest(){
- Long clusterPhyId = 2L;
String topic = "__consumer_offsets";
String metricName = "HealthCheckPassed";
Float metricValue = 2f;
@@ -142,7 +140,7 @@ public class TopicMetricESDAOTest extends KnowStreamApplicationTest {
Long endTime = System.currentTimeMillis();
Long startTime = endTime - 4 * 60 * 60 * 1000;
- Integer i = topicMetricESDAO.countMetricValue(clusterPhyId, topic, searchMatch, startTime, endTime);
+ Integer i = topicMetricESDAO.countMetricValue(clusterId, topic, searchMatch, startTime, endTime);
assert null != i;
}
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/kafka/KafkaContainerTest.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/kafka/KafkaContainerTest.java
new file mode 100644
index 00000000..49291ec9
--- /dev/null
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/kafka/KafkaContainerTest.java
@@ -0,0 +1,47 @@
+package com.xiaojukeji.know.streaming.test.kafka;
+
+import com.xiaojukeji.know.streaming.test.kafka.env.KafkaEnv;
+import com.xiaojukeji.know.streaming.test.km.env.KMEnv;
+import org.testcontainers.containers.KafkaContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+public class KafkaContainerTest implements KafkaEnv {
+ private static final String KAFKA_VERSION = "7.3.1";
+ private static final DockerImageName KAFKA_IMAGE = DockerImageName.parse(
+ "confluentinc/cp-kafka" + KMEnv.SEPARATOR + KAFKA_VERSION);
+ static KafkaContainer KAFKA_CONTAINER = new KafkaContainer(KAFKA_IMAGE)
+ .withEnv("TZ", "Asia/Shanghai");
+
+ @Override
+ public void init() {
+ Startables.deepStart(KAFKA_CONTAINER).join();
+ }
+
+ @Override
+ public void cleanup() {
+ /*
+ * 不需要手动调用清理容器
+ * 1. test执行结束后testcontainer会清理容器
+ * 2. junit5的@AfterAll方法会在SpringBoot生命周期结束前执行,导致数据库连接无法关闭
+ **/
+// if (KAFKA_CONTAINER != null) {
+// KAFKA_CONTAINER.close();
+// }
+ }
+
+ @Override
+ public String getBootstrapServers() {
+ return KAFKA_CONTAINER.getBootstrapServers();
+ }
+
+ @Override
+ public String getZKUrl() {
+ return String.format("%s:%d", KAFKA_CONTAINER.getHost(), KAFKA_CONTAINER.getMappedPort(2181));
+ }
+
+ @Override
+ public String getVersion() {
+ return KAFKA_VERSION;
+ }
+}
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/kafka/env/KafkaEnv.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/kafka/env/KafkaEnv.java
new file mode 100644
index 00000000..e35c4d5b
--- /dev/null
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/kafka/env/KafkaEnv.java
@@ -0,0 +1,13 @@
+package com.xiaojukeji.know.streaming.test.kafka.env;
+
+public interface KafkaEnv {
+ void init();
+
+ void cleanup();
+
+ String getBootstrapServers();
+
+ String getZKUrl();
+
+ String getVersion();
+}
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/km/KMBase.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/km/KMBase.java
new file mode 100644
index 00000000..8383a009
--- /dev/null
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/km/KMBase.java
@@ -0,0 +1,69 @@
+package com.xiaojukeji.know.streaming.test.km;
+
+import com.xiaojukeji.know.streaming.test.kafka.KafkaContainerTest;
+import com.xiaojukeji.know.streaming.test.kafka.env.KafkaEnv;
+import com.xiaojukeji.know.streaming.test.km.contrainer.KMContainer;
+import com.xiaojukeji.know.streaming.test.km.env.KMEnv;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.springframework.test.context.DynamicPropertyRegistry;
+import org.springframework.test.context.DynamicPropertySource;
+
+public abstract class KMBase {
+ private static KMEnv kmEnv;
+ private static KafkaEnv kafkaEnv;
+
+ @BeforeAll
+ static void init() {
+ if (container()) {
+ kmEnv = new KMContainer();
+ kmEnv.init();
+
+ if (kmEnv.kafka()) {
+ kafkaEnv = new KafkaContainerTest();
+ kafkaEnv.init();
+ }
+ }
+ }
+
+
+ @DynamicPropertySource
+ static void setUp(DynamicPropertyRegistry registry) {
+ registry.add("spring.datasource.know-streaming.jdbc-url", KMBase.kmEnv.jdbcUrl());
+
+ registry.add("spring.logi-job.jdbc-url", KMBase.kmEnv.jdbcUrl());
+
+ registry.add("spring.logi-security.jdbc-url", KMBase.kmEnv.jdbcUrl());
+
+ registry.add("spring.logi-security.jdbc-url", KMBase.kmEnv.jdbcUrl());
+
+ registry.add("es.client.address", KMBase.kmEnv.esUrl());
+ }
+
+
+ @AfterAll
+ static void destroy() {
+ if (kmEnv != null) {
+ kmEnv.cleanup();
+ }
+ if (kafkaEnv != null) {
+ kafkaEnv.cleanup();
+ }
+ }
+
+ static boolean container() {
+ return true;
+ }
+
+ protected String kafkaVersion() {
+ return kafkaEnv.getVersion();
+ }
+
+ protected String bootstrapServers() {
+ return kafkaEnv.getBootstrapServers();
+ }
+
+ protected String zookeeperUrl() {
+ return kafkaEnv.getZKUrl();
+ }
+}
diff --git a/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/km/contrainer/KMContainer.java b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/km/contrainer/KMContainer.java
new file mode 100644
index 00000000..78bb52b5
--- /dev/null
+++ b/km-rest/src/test/java/com/xiaojukeji/know/streaming/test/km/contrainer/KMContainer.java
@@ -0,0 +1,92 @@
+package com.xiaojukeji.know.streaming.test.km.contrainer;
+
+import com.xiaojukeji.know.streaming.test.km.env.KMEnv;
+import org.jetbrains.annotations.NotNull;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.elasticsearch.ElasticsearchContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.function.Supplier;
+
+public class KMContainer implements KMEnv {
+ private static final String ES_VERSION = "7.6.2";
+ private static final String LATEST_VERSION = "latest";
+
+ private static final String DB_PROPERTY = "?useUnicode=true" +
+ "&characterEncoding=utf8" +
+ "&jdbcCompliantTruncation=true" +
+ "&allowMultiQueries=true" +
+ "&useSSL=false" +
+ "&alwaysAutoGeneratedKeys=true" +
+ "&serverTimezone=GMT%2B8" +
+ "&allowPublicKeyRetrieval=true";
+ private static final DockerImageName ES_IMAGE = DockerImageName.parse(
+ "docker.io/library/elasticsearch" + KMEnv.SEPARATOR + ES_VERSION)
+ .asCompatibleSubstituteFor("docker.elastic.co/elasticsearch/elasticsearch");
+ private static final DockerImageName MYSQL_IMAGE = DockerImageName.parse(
+ "knowstreaming/knowstreaming-mysql" + KMEnv.SEPARATOR + LATEST_VERSION)
+ .asCompatibleSubstituteFor("mysql");
+
+ private static final DockerImageName INIT_IMAGE = DockerImageName.parse(
+ "knowstreaming/knowstreaming-manager" + KMEnv.SEPARATOR + LATEST_VERSION);
+
+ private static final ElasticsearchContainer ES_CONTAINER = new ElasticsearchContainer(ES_IMAGE)
+// .withImagePullPolicy(PullPolicy.alwaysPull())
+ .withEnv("TZ", "Asia/Shanghai")
+ .withEnv("ES_JAVA_OPTS", "-Xms512m -Xmx512m")
+ .withEnv("discovery.type", "single-node");
+ private static final GenericContainer> INIT_CONTAINER = new GenericContainer<>(INIT_IMAGE)
+ .withEnv("TZ", "Asia/Shanghai")
+ .withCommand("/bin/bash", "/es_template_create.sh")
+ .dependsOn(ES_CONTAINER);
+ private static final MySQLContainer> MYSQL_CONTAINER = new MySQLContainer<>(MYSQL_IMAGE)
+ .withEnv("MYSQL_ROOT_HOST", "%")
+ .withEnv("TZ", "Asia/Shanghai")
+ .withDatabaseName("know_streaming")
+ .withUsername("root")
+ .withPassword("mysql_pass");
+
+
+ @NotNull
+ public Supplier