mirror of
https://github.com/didi/KnowStreaming.git
synced 2025-12-24 20:22:12 +08:00
storage support s3
This commit is contained in:
@@ -2,6 +2,7 @@ package com.xiaojukeji.kafka.manager.common.utils;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
@@ -11,6 +12,20 @@ import java.util.Set;
|
||||
* @date 20/4/16
|
||||
*/
|
||||
public class ValidateUtils {
|
||||
/**
|
||||
* 任意一个为空, 则返回true
|
||||
*/
|
||||
public static boolean anyNull(Object... objects) {
|
||||
return Arrays.stream(objects).anyMatch(ValidateUtils::isNull);
|
||||
}
|
||||
|
||||
/**
|
||||
* 是空字符串或者空
|
||||
*/
|
||||
public static boolean anyBlank(String... strings) {
|
||||
return Arrays.stream(strings).anyMatch(StringUtils::isBlank);
|
||||
}
|
||||
|
||||
/**
|
||||
* 为空
|
||||
*/
|
||||
|
||||
@@ -68,5 +68,10 @@
|
||||
<artifactId>spring-test</artifactId>
|
||||
<version>${spring-version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>io.minio</groupId>
|
||||
<artifactId>minio</artifactId>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
</project>
|
||||
@@ -4,6 +4,7 @@ import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
@@ -24,7 +25,7 @@ public interface KafkaFileService {
|
||||
|
||||
KafkaFileDO getFileByFileName(String fileName);
|
||||
|
||||
Result<String> downloadKafkaConfigFile(Long fileId);
|
||||
Result<MultipartFile> downloadKafkaFile(Long fileId);
|
||||
|
||||
String getDownloadBaseUrl();
|
||||
}
|
||||
|
||||
@@ -10,13 +10,20 @@ import org.springframework.web.multipart.MultipartFile;
|
||||
public abstract class AbstractStorageService {
|
||||
/**
|
||||
* 上传
|
||||
* @param fileName 文件名
|
||||
* @param fileMd5 文件md5
|
||||
* @param uploadFile 文件
|
||||
* @return 上传结果
|
||||
*/
|
||||
public abstract boolean upload(String fileName, String fileMd5, MultipartFile uploadFile);
|
||||
|
||||
/**
|
||||
* 下载
|
||||
* 下载文件
|
||||
* @param fileName 文件名
|
||||
* @param fileMd5 文件md5
|
||||
* @return 文件
|
||||
*/
|
||||
public abstract Result<String> download(String fileName, String fileMd5);
|
||||
public abstract Result<MultipartFile> download(String fileName, String fileMd5);
|
||||
|
||||
/**
|
||||
* 下载base地址
|
||||
|
||||
@@ -1,33 +0,0 @@
|
||||
package com.xiaojukeji.kafka.manager.kcm.component.storage.local;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Service;
|
||||
import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
/**
|
||||
* @author zengqiao
|
||||
* @date 20/9/17
|
||||
*/
|
||||
@Service("storageService")
|
||||
public class Local extends AbstractStorageService {
|
||||
@Value("${kcm.storage.base-url}")
|
||||
private String baseUrl;
|
||||
|
||||
@Override
|
||||
public boolean upload(String fileName, String fileMd5, MultipartFile uploadFile) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<String> download(String fileName, String fileMd5) {
|
||||
return Result.buildFrom(ResultStatus.STORAGE_DOWNLOAD_FILE_FAILED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDownloadBaseUrl() {
|
||||
return baseUrl;
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,125 @@
|
||||
package com.xiaojukeji.kafka.manager.kcm.component.storage.s3;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
|
||||
import io.minio.*;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.mock.web.MockMultipartFile;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.PostConstruct;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
|
||||
@Service("storageService")
|
||||
public class S3Service extends AbstractStorageService {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(S3Service.class);
|
||||
|
||||
@Value("${kcm.s3.endpoint:}")
|
||||
private String endpoint;
|
||||
|
||||
@Value("${kcm.s3.access-key:}")
|
||||
private String accessKey;
|
||||
|
||||
@Value("${kcm.s3.secret-key:}")
|
||||
private String secretKey;
|
||||
|
||||
@Value("${kcm.s3.bucket:}")
|
||||
private String bucket;
|
||||
|
||||
private MinioClient minioClient;
|
||||
|
||||
@PostConstruct
|
||||
public void init() {
|
||||
try {
|
||||
if (ValidateUtils.anyBlank(this.endpoint, this.accessKey, this.secretKey, this.bucket)) {
|
||||
// without config s3
|
||||
return;
|
||||
}
|
||||
minioClient = new MinioClient(endpoint, accessKey, secretKey);
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=S3Service||method=init||fields={}||errMsg={}", this.toString(), e.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean upload(String fileName, String fileMd5, MultipartFile uploadFile) {
|
||||
InputStream inputStream = null;
|
||||
try {
|
||||
if (!createBucketIfNotExist()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
inputStream = uploadFile.getInputStream();
|
||||
minioClient.putObject(PutObjectArgs.builder()
|
||||
.bucket(this.bucket)
|
||||
.object(fileName)
|
||||
.stream(inputStream, inputStream.available(), -1)
|
||||
.build()
|
||||
);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=S3Service||method=upload||fileName={}||errMsg={}||msg=upload failed", fileName, e.getMessage());
|
||||
} finally {
|
||||
if (inputStream != null) {
|
||||
try {
|
||||
inputStream.close();
|
||||
} catch (IOException e) {
|
||||
; // ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<MultipartFile> download(String fileName, String fileMd5) {
|
||||
try {
|
||||
final ObjectStat stat = minioClient.statObject(this.bucket, fileName);
|
||||
|
||||
InputStream is = minioClient.getObject(this.bucket, fileName);
|
||||
|
||||
return Result.buildSuc(new MockMultipartFile(fileName, fileName, stat.contentType(), is));
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=S3Service||method=download||fileName={}||errMsg={}||msg=download failed", fileName, e.getMessage());
|
||||
}
|
||||
return Result.buildFrom(ResultStatus.STORAGE_DOWNLOAD_FILE_FAILED);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getDownloadBaseUrl() {
|
||||
return this.endpoint + "/" + this.bucket;
|
||||
}
|
||||
|
||||
private boolean createBucketIfNotExist() {
|
||||
try {
|
||||
boolean found = minioClient.bucketExists(BucketExistsArgs.builder().bucket(this.bucket).build());
|
||||
if (!found) {
|
||||
minioClient.makeBucket(MakeBucketArgs.builder().bucket(this.bucket).build());
|
||||
}
|
||||
|
||||
LOGGER.info("class=S3Service||method=createBucketIfNotExist||bucket={}||msg=check and create bucket success", this.bucket);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=S3Service||method=createBucketIfNotExist||bucket={}||errMsg={}||msg=create bucket failed", this.bucket, e.getMessage());
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "S3Service{" +
|
||||
"endpoint='" + endpoint + '\'' +
|
||||
", accessKey='" + accessKey + '\'' +
|
||||
", secretKey='" + secretKey + '\'' +
|
||||
", bucket='" + bucket + '\'' +
|
||||
'}';
|
||||
}
|
||||
}
|
||||
@@ -4,17 +4,18 @@ import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.ResultStatus;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.CopyUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.dao.KafkaFileDao;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
|
||||
import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
|
||||
import com.xiaojukeji.kafka.manager.kcm.KafkaFileService;
|
||||
import com.xiaojukeji.kafka.manager.kcm.component.storage.AbstractStorageService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.dao.DuplicateKeyException;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
@@ -163,7 +164,7 @@ public class KafkaFileServiceImpl implements KafkaFileService {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Result<String> downloadKafkaConfigFile(Long fileId) {
|
||||
public Result<MultipartFile> downloadKafkaFile(Long fileId) {
|
||||
KafkaFileDO kafkaFileDO = kafkaFileDao.getById(fileId);
|
||||
if (ValidateUtils.isNull(kafkaFileDO)) {
|
||||
return Result.buildFrom(ResultStatus.RESOURCE_NOT_EXIST);
|
||||
|
||||
@@ -1,23 +1,30 @@
|
||||
package com.xiaojukeji.kafka.manager.web.api.versionone.rd;
|
||||
|
||||
import com.xiaojukeji.kafka.manager.common.bizenum.KafkaFileEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.Result;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.dto.normal.KafkaFileDTO;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaFileVO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.kcm.component.storage.common.StorageEnum;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.pojo.KafkaFileDO;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.kcm.KafkaFileService;
|
||||
import com.xiaojukeji.kafka.manager.common.entity.vo.rd.KafkaFileVO;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.JsonUtils;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.SpringTool;
|
||||
import com.xiaojukeji.kafka.manager.common.constant.ApiPrefix;
|
||||
import com.xiaojukeji.kafka.manager.common.utils.ValidateUtils;
|
||||
import com.xiaojukeji.kafka.manager.kcm.KafkaFileService;
|
||||
import com.xiaojukeji.kafka.manager.kcm.component.storage.common.StorageEnum;
|
||||
import com.xiaojukeji.kafka.manager.service.service.ClusterService;
|
||||
import com.xiaojukeji.kafka.manager.web.converters.KafkaFileConverter;
|
||||
import io.swagger.annotations.Api;
|
||||
import io.swagger.annotations.ApiOperation;
|
||||
import org.apache.tomcat.util.http.fileupload.IOUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Autowired;
|
||||
import org.springframework.web.bind.annotation.*;
|
||||
import org.springframework.web.multipart.MultipartFile;
|
||||
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
import java.io.InputStream;
|
||||
import java.net.URLEncoder;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@@ -30,6 +37,8 @@ import java.util.Map;
|
||||
@RestController
|
||||
@RequestMapping(ApiPrefix.API_V1_RD_PREFIX)
|
||||
public class RdKafkaFileController {
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(RdKafkaFileController.class);
|
||||
|
||||
@Autowired
|
||||
private ClusterService clusterService;
|
||||
|
||||
@@ -71,9 +80,33 @@ public class RdKafkaFileController {
|
||||
return new Result<>(KafkaFileConverter.convertKafkaFileVOList(kafkaFileDOList, clusterService));
|
||||
}
|
||||
|
||||
@ApiOperation(value = "文件预览", notes = "")
|
||||
@Deprecated
|
||||
@ApiOperation(value = "文件下载", notes = "")
|
||||
@RequestMapping(value = "kafka-files/{fileId}/config-files", method = RequestMethod.GET)
|
||||
public Result<String> previewKafkaFile(@PathVariable("fileId") Long fileId) {
|
||||
return kafkaFileService.downloadKafkaConfigFile(fileId);
|
||||
public Result downloadKafkaFile(@PathVariable("fileId") Long fileId, HttpServletResponse response) {
|
||||
Result<MultipartFile> multipartFileResult = kafkaFileService.downloadKafkaFile(fileId);
|
||||
|
||||
if (multipartFileResult.failed() || ValidateUtils.isNull(multipartFileResult.getData())) {
|
||||
return multipartFileResult;
|
||||
}
|
||||
|
||||
InputStream is = null;
|
||||
try {
|
||||
response.setContentType(multipartFileResult.getData().getContentType());
|
||||
response.setCharacterEncoding("UTF-8");
|
||||
response.setHeader("Content-Disposition", "attachment;filename=" + URLEncoder.encode(multipartFileResult.getData().getOriginalFilename(), "UTF-8"));
|
||||
is = multipartFileResult.getData().getInputStream();
|
||||
IOUtils.copy(is, response.getOutputStream());
|
||||
} catch (Exception e) {
|
||||
LOGGER.error("class=RdKafkaFileController||method=downloadKafkaFile||fileId={}||errMsg={}||msg=modify response failed", fileId, e.getMessage());
|
||||
} finally {
|
||||
try {
|
||||
if (is != null) {
|
||||
is.close();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
}
|
||||
}
|
||||
return Result.buildSuc();
|
||||
}
|
||||
}
|
||||
@@ -52,8 +52,11 @@ account:
|
||||
|
||||
kcm:
|
||||
enabled: false
|
||||
storage:
|
||||
base-url: http://127.0.0.1
|
||||
s3:
|
||||
endpoint: 127.0.0.1
|
||||
access-key: 1234567890
|
||||
secret-key: 0987654321
|
||||
bucket: logi-kafka
|
||||
n9e:
|
||||
base-url: http://127.0.0.1:8004
|
||||
user-token: 12345678
|
||||
|
||||
Reference in New Issue
Block a user