diff --git a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/ValidateUtils.java b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/ValidateUtils.java
index 1ece8f9f..6bd0c55c 100644
--- a/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/ValidateUtils.java
+++ b/kafka-manager-common/src/main/java/com/xiaojukeji/kafka/manager/common/utils/ValidateUtils.java
@@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.common.utils;
import org.apache.commons.lang.StringUtils;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -11,6 +12,20 @@ import java.util.Set;
* @date 20/4/16
*/
public class ValidateUtils {
+ /**
+ * 任意一个为空, 则返回true
+ */
+ public static boolean anyNull(Object... objects) {
+ return Arrays.stream(objects).anyMatch(ValidateUtils::isNull);
+ }
+
+ /**
+ * 是空字符串或者空
+ */
+ public static boolean anyBlank(String... strings) {
+ return Arrays.stream(strings).anyMatch(StringUtils::isBlank);
+ }
+
/**
* 为空
*/
diff --git a/kafka-manager-extends/kafka-manager-kcm/pom.xml b/kafka-manager-extends/kafka-manager-kcm/pom.xml
index 741f0f12..7ffd00e3 100644
--- a/kafka-manager-extends/kafka-manager-kcm/pom.xml
+++ b/kafka-manager-extends/kafka-manager-kcm/pom.xml
@@ -68,5 +68,10 @@
spring-test
${spring-version}
+
+
+ io.minio
+ minio
+
\ No newline at end of file
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileService.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileService.java
index b2de3a32..babfeb15 100644
--- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileService.java
+++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/KafkaFileService.java
@@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO;
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
+import org.springframework.web.multipart.MultipartFile;
import java.util.List;
@@ -24,7 +25,7 @@ public interface KafkaFileService {
KafkaFileDO getFileByFileName(String fileName);
- Result downloadKafkaConfigFile(Long fileId);
+ Result downloadKafkaFile(Long fileId);
String getDownloadBaseUrl();
}
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/AbstractStorageService.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/AbstractStorageService.java
index 90192b0b..34c209ac 100644
--- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/AbstractStorageService.java
+++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/AbstractStorageService.java
@@ -10,13 +10,20 @@ import org.springframework.web.multipart.MultipartFile;
public abstract class AbstractStorageService {
/**
* 上传
+ * @param fileName 文件名
+ * @param fileMd5 文件md5
+ * @param uploadFile 文件
+ * @return 上传结果
*/
public abstract boolean upload(String fileName, String fileMd5, MultipartFile uploadFile);
/**
- * 下载
+ * 下载文件
+ * @param fileName 文件名
+ * @param fileMd5 文件md5
+ * @return 文件
*/
- public abstract Result download(String fileName, String fileMd5);
+ public abstract Result download(String fileName, String fileMd5);
/**
* 下载base地址
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java
deleted file mode 100644
index 992c09e4..00000000
--- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/local/Local.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package com.xiaojukeji.kafka.manager.kcm.component.storage.local;
-
-import com.xiaojukeji.kafka.manager.common.entity.Result;
-import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Service;
-import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
-import org.springframework.web.multipart.MultipartFile;
-
-/**
- * @author zengqiao
- * @date 20/9/17
- */
-@Service("storageService")
-public class Local extends AbstractStorageService {
- @Value("${kcm.storage.base-url}")
- private String baseUrl;
-
- @Override
- public boolean upload(String fileName, String fileMd5, MultipartFile uploadFile) {
- return false;
- }
-
- @Override
- public Result download(String fileName, String fileMd5) {
- return Result.buildFrom(ResultStatus.STORAGE_DOWNLOAD_FILE_FAILED);
- }
-
- @Override
- public String getDownloadBaseUrl() {
- return baseUrl;
- }
-}
\ No newline at end of file
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java
new file mode 100644
index 00000000..419e66e0
--- /dev/null
+++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/component/storage/s3/S3Service.java
@@ -0,0 +1,125 @@
+package com.xiaojukeji.kafka.manager.kcm.component.storage.s3;
+
+import com.xiaojukeji.kafka.manager.common.entity.Result;
+import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
+import io.minio.*;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.mock.web.MockMultipartFile;
+import org.springframework.stereotype.Service;
+import org.springframework.web.multipart.MultipartFile;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.io.InputStream;
+
+
+@Service("storageService")
+public class S3Service extends AbstractStorageService {
+ private final static Logger LOGGER = LoggerFactory.getLogger(S3Service.class);
+
+ @Value("${kcm.s3.endpoint:}")
+ private String endpoint;
+
+ @Value("${kcm.s3.access-key:}")
+ private String accessKey;
+
+ @Value("${kcm.s3.secret-key:}")
+ private String secretKey;
+
+ @Value("${kcm.s3.bucket:}")
+ private String bucket;
+
+ private MinioClient minioClient;
+
+ @PostConstruct
+ public void init() {
+ try {
+ if (ValidateUtils.anyBlank(this.endpoint, this.accessKey, this.secretKey, this.bucket)) {
+ // without config s3
+ return;
+ }
+ minioClient = new MinioClient(endpoint, accessKey, secretKey);
+ } catch (Exception e) {
+ LOGGER.error("class=S3Service||method=init||fields={}||errMsg={}", this.toString(), e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean upload(String fileName, String fileMd5, MultipartFile uploadFile) {
+ InputStream inputStream = null;
+ try {
+ if (!createBucketIfNotExist()) {
+ return false;
+ }
+
+ inputStream = uploadFile.getInputStream();
+ minioClient.putObject(PutObjectArgs.builder()
+ .bucket(this.bucket)
+ .object(fileName)
+ .stream(inputStream, inputStream.available(), -1)
+ .build()
+ );
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("class=S3Service||method=upload||fileName={}||errMsg={}||msg=upload failed", fileName, e.getMessage());
+ } finally {
+ if (inputStream != null) {
+ try {
+ inputStream.close();
+ } catch (IOException e) {
+ ; // ignore
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public Result download(String fileName, String fileMd5) {
+ try {
+ final ObjectStat stat = minioClient.statObject(this.bucket, fileName);
+
+ InputStream is = minioClient.getObject(this.bucket, fileName);
+
+ return Result.buildSuc(new MockMultipartFile(fileName, fileName, stat.contentType(), is));
+ } catch (Exception e) {
+ LOGGER.error("class=S3Service||method=download||fileName={}||errMsg={}||msg=download failed", fileName, e.getMessage());
+ }
+ return Result.buildFrom(ResultStatus.STORAGE_DOWNLOAD_FILE_FAILED);
+ }
+
+ @Override
+ public String getDownloadBaseUrl() {
+ return this.endpoint + "/" + this.bucket;
+ }
+
+ private boolean createBucketIfNotExist() {
+ try {
+ boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(this.bucket).build());
+ if (!found) {
+ minioClient.makeBucket(MakeBucketArgs.builder().bucket(this.bucket).build());
+ }
+
+ LOGGER.info("class=S3Service||method=createBucketIfNotExist||bucket={}||msg=check and create bucket success", this.bucket);
+ return true;
+ } catch (Exception e) {
+ LOGGER.error("class=S3Service||method=createBucketIfNotExist||bucket={}||errMsg={}||msg=create bucket failed", this.bucket, e.getMessage());
+ }
+ return false;
+ }
+
+ @Override
+ public String toString() {
+ return "S3Service{" +
+ "endpoint='" + endpoint + '\'' +
+ ", accessKey='" + accessKey + '\'' +
+ ", secretKey='" + secretKey + '\'' +
+ ", bucket='" + bucket + '\'' +
+ '}';
+ }
+}
diff --git a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java
index 307c486c..37f8753a 100644
--- a/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java
+++ b/kafka-manager-extends/kafka-manager-kcm/src/main/java/com/xiaojukeji/kafka/manager/kcm/impl/KafkaFileServiceImpl.java
@@ -4,17 +4,18 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO;
+import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
import com.xiaojukeji.kafka.manager.common.utils.CopyUtils;
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
import com.xiaojukeji.kafka.manager.dao.KafkaFileDao;
-import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
-import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
import com.xiaojukeji.kafka.manager.kcm.KafkaFileService;
+import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DuplicateKeyException;
import org.springframework.stereotype.Service;
+import org.springframework.web.multipart.MultipartFile;
import java.util.ArrayList;
import java.util.List;
@@ -163,7 +164,7 @@ public class KafkaFileServiceImpl implements KafkaFileService {
}
@Override
- public Result downloadKafkaConfigFile(Long fileId) {
+ public Result downloadKafkaFile(Long fileId) {
KafkaFileDO kafkaFileDO = kafkaFileDao.getById(fileId);
if (ValidateUtils.isNull(kafkaFileDO)) {
return Result.buildFrom(ResultStatus.RESOURCE_NOT_EXIST);
diff --git a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java
index 823bbe70..009d540a 100644
--- a/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java
+++ b/kafka-manager-web/src/main/java/com/xiaojukeji/kafka/manager/web/api/versionone/rd/RdKafkaFileController.java
@@ -1,23 +1,30 @@
package com.xiaojukeji.kafka.manager.web.api.versionone.rd;
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum;
+import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
import com.xiaojukeji.kafka.manager.common.entity.Result;
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO;
-import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaFileVO;
-import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
-import com.xiaojukeji.kafka.manager.kcm.component.storage.common.StorageEnum;
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
-import com.xiaojukeji.kafka.manager.service.service.ClusterService;
-import com.xiaojukeji.kafka.manager.kcm.KafkaFileService;
+import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaFileVO;
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
-import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
+import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
+import com.xiaojukeji.kafka.manager.kcm.KafkaFileService;
+import com.xiaojukeji.kafka.manager.kcm.component.storage.common.StorageEnum;
+import com.xiaojukeji.kafka.manager.service.service.ClusterService;
import com.xiaojukeji.kafka.manager.web.converters.KafkaFileConverter;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
+import org.apache.tomcat.util.http.fileupload.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
+import org.springframework.web.multipart.MultipartFile;
+import javax.servlet.http.HttpServletResponse;
+import java.io.InputStream;
+import java.net.URLEncoder;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -30,6 +37,8 @@ import java.util.Map;
@RestController
@RequestMapping(ApiPrefix.API_V1_RD_PREFIX)
public class RdKafkaFileController {
+ private final static Logger LOGGER = LoggerFactory.getLogger(RdKafkaFileController.class);
+
@Autowired
private ClusterService clusterService;
@@ -71,9 +80,33 @@ public class RdKafkaFileController {
return new Result<>(KafkaFileConverter.convertKafkaFileVOList(kafkaFileDOList, clusterService));
}
- @ApiOperation(value = "文件预览", notes = "")
+ @Deprecated
+ @ApiOperation(value = "文件下载", notes = "")
@RequestMapping(value = "kafka-files/{fileId}/config-files", method = RequestMethod.GET)
- public Result previewKafkaFile(@PathVariable("fileId") Long fileId) {
- return kafkaFileService.downloadKafkaConfigFile(fileId);
+ public Result downloadKafkaFile(@PathVariable("fileId") Long fileId, HttpServletResponse response) {
+ Result multipartFileResult = kafkaFileService.downloadKafkaFile(fileId);
+
+ if (multipartFileResult.failed() || ValidateUtils.isNull(multipartFileResult.getData())) {
+ return multipartFileResult;
+ }
+
+ InputStream is = null;
+ try {
+ response.setContentType(multipartFileResult.getData().getContentType());
+ response.setCharacterEncoding("UTF-8");
+ response.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(multipartFileResult.getData().getOriginalFilename(), "UTF-8"));
+ is = multipartFileResult.getData().getInputStream();
+ IOUtils.copy(is, response.getOutputStream());
+ } catch (Exception e) {
+ LOGGER.error("class=RdKafkaFileController||method=downloadKafkaFile||fileId={}||errMsg={}||msg=modify response failed", fileId, e.getMessage());
+ } finally {
+ try {
+ if (is != null) {
+ is.close();
+ }
+ } catch (Exception e) {
+ }
+ }
+ return Result.buildSuc();
}
}
\ No newline at end of file
diff --git a/kafka-manager-web/src/main/resources/application.yml b/kafka-manager-web/src/main/resources/application.yml
index 8c137da8..5b01d321 100644
--- a/kafka-manager-web/src/main/resources/application.yml
+++ b/kafka-manager-web/src/main/resources/application.yml
@@ -52,8 +52,11 @@ account:
kcm:
enabled: false
- storage:
- base-url: http://127.0.0.1
+ s3:
+ endpoint: 127.0.0.1
+ access-key: 1234567890
+ secret-key: 0987654321
+ bucket: logi-kafka
n9e:
base-url: http://127.0.0.1:8004
user-token: 12345678
diff --git a/pom.xml b/pom.xml
index 7165880e..6588d335 100644
--- a/pom.xml
+++ b/pom.xml
@@ -223,6 +223,12 @@
curator-recipes
2.10.0
+
+
+ io.minio
+ minio
+ 7.1.0
+
\ No newline at end of file