一、后端:
1.引入pom
<dependency><groupId>com.amazonaws</groupId><artifactId>aws-java-sdk-s3</artifactId><version>1.12.263</version></dependency>
2.配置application.yml
jeecg:minio:minioUrl: http://localhost:9000minioName: minioadminminioPass: Aa123456@adminbucketName: exam-bucket
3.aws配置
package com.ynfy.config;import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class AmazonS3Config {@Value(value = "${jeecg.minio.minioUrl}")private String minioUrl;@Value(value = "${jeecg.minio.minioName}")private String minioName;@Value(value = "${jeecg.minio.minioPass}")private String minioPass;@Bean(name = "amazonS3Client")public AmazonS3 amazonS3Client() {//设置连接时的参数ClientConfiguration config = new ClientConfiguration();//设置连接方式为HTTP,可选参数为HTTP和HTTPSconfig.setProtocol(Protocol.HTTP);//设置网络访问超时时间config.setConnectionTimeout(5000);config.setUseExpectContinue(true);AWSCredentials credentials = new BasicAWSCredentials(minioName, minioPass);//设置EndpointAwsClientBuilder.EndpointConfiguration endPoint = new AwsClientBuilder.EndpointConfiguration(minioUrl, Regions.US_EAST_1.name());AmazonS3 amazonS3 = AmazonS3ClientBuilder.standard().withClientConfiguration(config).withCredentials(new AWSStaticCredentialsProvider(credentials)).withEndpointConfiguration(endPoint).withPathStyleAccessEnabled(true).build();return amazonS3;}}
4.新建数据库表aws_s3_upload
用于记录上传的文件信息:md5值,对象key等信息。
5.接口IAwsS3UploadService:
package com.ynfy.buss.awss3upload.service;import com.baomidou.mybatisplus.extension.service.IService;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import com.ynfy.buss.awss3upload.entity.TaskParam;
import com.ynfy.buss.awss3upload.entity.dto.TaskInfoDTO;import java.util.Map;/*** @Description: AWS.S3 大文件分片上传* @Author: jeecg-boot* @Date: 2024-10-31* @Version: V1.0*/
public interface IAwsS3UploadService extends IService<AwsS3Upload> {/*** 根据md5标识获取分片上传任务** @param identifier* @return*/AwsS3Upload getByIdentifier(String identifier);/*** 初始化一个任务*/TaskInfoDTO initTask(TaskParam param);/*** 获取上传进度** @param identifier* @return*/TaskInfoDTO getTaskInfo(String identifier);/*** 生成预签名上传url** @param bucket 桶名* @param objectKey 对象的key* @param params 额外的参数* @return*/String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params);/*** 合并分片** @param identifier*/void merge(String identifier);
}
6.接口实现类AwsS3UploadServiceImpl:
package com.ynfy.buss.awss3upload.service.impl;import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import com.amazonaws.HttpMethod;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.model.*;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ynfy.buss.awss3upload.constant.MinioConstant;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import com.ynfy.buss.awss3upload.entity.TaskParam;
import com.ynfy.buss.awss3upload.entity.dto.TaskInfoDTO;
import com.ynfy.buss.awss3upload.entity.dto.TaskRecordDTO;
import com.ynfy.buss.awss3upload.mapper.AwsS3UploadMapper;
import com.ynfy.buss.awss3upload.service.IAwsS3UploadService;
import org.jeecg.common.exception.JeecgBootException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.MediaType;
import org.springframework.http.MediaTypeFactory;
import org.springframework.stereotype.Service;import javax.annotation.Resource;
import java.net.URL;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;/*** @Description: AWS.S3 大文件分片上传* @Author: jeecg-boot* @Date: 2024-10-31* @Version: V1.0*/
@Service
public class AwsS3UploadServiceImpl extends ServiceImpl<AwsS3UploadMapper, AwsS3Upload> implements IAwsS3UploadService {@Value(value = "${jeecg.minio.minioUrl}")private String minioUrl;@Value(value = "${jeecg.minio.minioName}")private String minioName;@Value(value = "${jeecg.minio.minioPass}")private String minioPass;@Value(value = "${jeecg.minio.bucketName}")private String bucketName;@Resourceprivate AmazonS3 amazonS3;/*** 根据md5标识获取分片上传任务** @param identifier* @return*/@Overridepublic AwsS3Upload getByIdentifier(String identifier) {return this.getOne(new QueryWrapper<AwsS3Upload>().lambda().eq(AwsS3Upload::getFileIdentifier, identifier));}/*** 初始化一个任务*/@Overridepublic TaskInfoDTO initTask(TaskParam param) {Date currentDate = new Date();String fileName = param.getFileName();String suffix = fileName.substring(fileName.lastIndexOf(".") + 1, fileName.length());String key = StrUtil.format("{}/{}.{}", DateUtil.format(currentDate, "YYYY/MM/dd"),fileName.substring(0, fileName.lastIndexOf(".")) + "_" + System.currentTimeMillis(), suffix);String contentType = MediaTypeFactory.getMediaType(key).orElse(MediaType.APPLICATION_OCTET_STREAM).toString();ObjectMetadata objectMetadata = new ObjectMetadata();objectMetadata.setContentType(contentType);InitiateMultipartUploadResult initiateMultipartUploadResult = amazonS3.initiateMultipartUpload(new InitiateMultipartUploadRequest(bucketName, key).withObjectMetadata(objectMetadata));String uploadId = initiateMultipartUploadResult.getUploadId();AwsS3Upload task = new AwsS3Upload();int chunkNum = (int) Math.ceil(param.getTotalSize() * 1.0 / param.getChunkSize());task.setBucketName(bucketName);task.setChunkNum(chunkNum);task.setChunkSize(param.getChunkSize());task.setTotalSize(param.getTotalSize());task.setFileIdentifier(param.getIdentifier());task.setFileName(param.getFileName());task.setObjectKey(key);task.setUploadId(uploadId);save(task);return new TaskInfoDTO().setFinished(false).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(bucketName, key));}public String getPath(String bucket, String objectKey) {return StrUtil.format("{}/{}/{}", minioUrl, bucket, objectKey);}/*** 获取上传进度** @param identifier* @return*/@Overridepublic TaskInfoDTO getTaskInfo(String identifier) {AwsS3Upload task = getByIdentifier(identifier);if (Objects.isNull(task)) {return null;}TaskInfoDTO result = new TaskInfoDTO().setFinished(true).setTaskRecord(TaskRecordDTO.convertFromEntity(task)).setPath(getPath(task.getBucketName(), task.getObjectKey()));boolean doesObjectExist = amazonS3.doesObjectExist(task.getBucketName(), task.getObjectKey());if (!doesObjectExist) {// 未上传完,返回已上传的分片ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());PartListing partListing = amazonS3.listParts(listPartsRequest);result.setFinished(false).getTaskRecord().setExitPartList(partListing.getParts());}return result;}@Overridepublic String genPreSignUploadUrl(String bucket, String objectKey, Map<String, String> params) {Date currentDate = new Date();Date expireDate = DateUtil.offsetMillisecond(currentDate, MinioConstant.PRE_SIGN_URL_EXPIRE.intValue());GeneratePresignedUrlRequest request = new GeneratePresignedUrlRequest(bucket, objectKey).withExpiration(expireDate).withMethod(HttpMethod.PUT);if (!Objects.isNull(params)) {params.forEach((key, val) -> request.addRequestParameter(key, val));}URL preSignedUrl = amazonS3.generatePresignedUrl(request);return preSignedUrl.toString();}@Overridepublic void merge(String identifier) {AwsS3Upload task = getByIdentifier(identifier);if (Objects.isNull(task)) {throw new JeecgBootException("分片任务不存");}ListPartsRequest listPartsRequest = new ListPartsRequest(task.getBucketName(), task.getObjectKey(), task.getUploadId());PartListing partListing = amazonS3.listParts(listPartsRequest);List<PartSummary> parts = partListing.getParts();if (!task.getChunkNum().equals(parts.size())) {// 已上传分块数量与记录中的数量不对应,不能合并分块throw new JeecgBootException("分片缺失,请重新上传");}CompleteMultipartUploadRequest completeMultipartUploadRequest = new CompleteMultipartUploadRequest().withUploadId(task.getUploadId()).withKey(task.getObjectKey()).withBucketName(task.getBucketName()).withPartETags(parts.stream().map(partSummary -> new PartETag(partSummary.getPartNumber(), partSummary.getETag())).collect(Collectors.toList()));CompleteMultipartUploadResult result = amazonS3.completeMultipartUpload(completeMultipartUploadRequest);}
}
7.接口controller类AwsS3UploadController:
package com.ynfy.buss.awss3upload.controller;import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import com.ynfy.buss.awss3upload.entity.TaskParam;
import com.ynfy.buss.awss3upload.entity.dto.TaskInfoDTO;
import com.ynfy.buss.awss3upload.service.IAwsS3UploadService;
import io.swagger.annotations.Api;
import lombok.extern.slf4j.Slf4j;
import org.jeecg.common.api.vo.Result;
import org.jeecg.common.system.base.controller.JeecgController;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.validation.BindingResult;
import org.springframework.web.bind.annotation.*;import javax.validation.Valid;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;/*** @Description: AWS.S3 大文件分片上传* @Author: jeecg-boot* @Date: 2024-10-31* @Version: V1.0*/
@Api(tags = "AWS.S3 大文件分片上传")
@RestController
@RequestMapping("/awsS3Upload")
@Slf4j
public class AwsS3UploadController extends JeecgController<AwsS3Upload, IAwsS3UploadService> {@Autowiredprivate IAwsS3UploadService awsS3UploadService;/*** 获取上传进度** @param identifier 文件md5* @return*/@GetMapping("/task/{identifier}")public Result<TaskInfoDTO> taskInfo(@PathVariable("identifier") String identifier) {return Result.ok(awsS3UploadService.getTaskInfo(identifier));}/*** 创建一个上传任务** @return*/@PostMapping(value = "/task/init")public Result<TaskInfoDTO> initTask(@Valid @RequestBody TaskParam param, BindingResult bindingResult) {if (bindingResult.hasErrors()) {return Result.error(bindingResult.getFieldError().getDefaultMessage());}return Result.OK(awsS3UploadService.initTask(param));}/*** 获取每个分片的预签名上传地址** @param identifier* @param partNumber* @return*/@GetMapping("/task/{identifier}/{partNumber}")public Result<?> preSignUploadUrl(@PathVariable("identifier") String identifier, @PathVariable("partNumber") Integer partNumber) {AwsS3Upload task = awsS3UploadService.getByIdentifier(identifier);if (Objects.isNull(task)) {return Result.error("分片任务不存在");}Map<String, String> params = new HashMap<>();params.put("partNumber", partNumber.toString());params.put("uploadId", task.getUploadId());return Result.OK("", awsS3UploadService.genPreSignUploadUrl(task.getBucketName(), task.getObjectKey(), params));}/*** 合并分片** @param identifier* @return*/@PostMapping("/task/merge/{identifier}")public Result<?> merge(@PathVariable("identifier") String identifier) {awsS3UploadService.merge(identifier);return Result.OK();}}
入参类TaskParam:
package com.ynfy.buss.awss3upload.entity;import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;import javax.validation.constraints.NotBlank;
import javax.validation.constraints.NotNull;@Data
@ToString
@Accessors(chain = true)
public class TaskParam {/*** 文件唯一标识(MD5)*/@NotBlank(message = "文件标识不能为空")private String identifier;/*** 文件大小(byte)*/@NotNull(message = "文件大小不能为空")private Long totalSize;/*** 分片大小(byte)*/@NotNull(message = "分片大小不能为空")private Long chunkSize;/*** 文件名称*/@NotBlank(message = "文件名称不能为空")private String fileName;
}
dto
package com.ynfy.buss.awss3upload.entity.dto;import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.Accessors;@Data
@ToString
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class TaskInfoDTO {/*** 是否完成上传(是否已经合并分片)*/private boolean finished;/*** 文件地址*/private String path;/*** 上传记录*/private TaskRecordDTO taskRecord;}
package com.ynfy.buss.awss3upload.entity.dto;import cn.hutool.core.bean.BeanUtil;
import com.amazonaws.services.s3.model.PartSummary;
import com.ynfy.buss.awss3upload.entity.AwsS3Upload;
import lombok.Data;
import lombok.ToString;
import lombok.experimental.Accessors;import java.util.List;@Data
@ToString
@Accessors(chain = true)
public class TaskRecordDTO extends AwsS3Upload {/*** 已上传完的分片*/private List<PartSummary> exitPartList;public static TaskRecordDTO convertFromEntity(AwsS3Upload task) {TaskRecordDTO dto = new TaskRecordDTO();BeanUtil.copyProperties(task, dto);return dto;}
}
常量:
package com.ynfy.buss.awss3upload.constant;public interface MinioConstant {// 分块大小int DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024;// 预签名url过期时间(ms)Long PRE_SIGN_URL_EXPIRE = 60 * 10 * 1000L;
}
二、前端
1.上传组件封装 AwsS3Upload.vue
<template><a-upload:maxCount="1":fileList="fileList":progress="progress":custom-request="handleHttpRequest"accept="video/*"@change="handleChange":on-remove="handleRemoveFile"><a-button :disabled="fileList.length == 1"><upload-outlined></upload-outlined>上传</a-button></a-upload>
</template><script lang="ts" setup>
import { UploadOutlined } from "@ant-design/icons-vue";
import md5 from "/@/utils/lib/md5";
import Queue from "promise-queue-plus";
import { ref, watch } from "vue";
import { useMessage } from "@/hooks/web/useMessage";
import { initTask, merge, preSignUrl, taskInfo } from "/@/api/awss3upload/awss3";
import axios from "axios";
import { getFileAccessHttpUrl } from "@/utils/common/compUtils";
import type { UploadProps } from "ant-design-vue";const { createMessage } = useMessage();
const emit = defineEmits(["update:modelValue"]);const fileList = ref<any>([]);const props = defineProps({modelValue: String
});watch(() => props.modelValue,(val) => {parsePathsValue(val);},{ immediate: true }
);// 解析数据库存储的逗号分割
function parsePathsValue(paths) {if (!paths || paths.length == 0) {fileList.value = [];return;}let list: any[] = [];for (const item of paths.split(",")) {let url = getFileAccessHttpUrl(item);list.push({uid: uidGenerator(),name: getFileName(item),status: "done",url: url,response: { status: "history", message: item }});}fileList.value = list;
}function getFileName(path) {if (path.lastIndexOf("\\") >= 0) {let reg = new RegExp("\\\\", "g");path = path.replace(reg, "/");}return path.substring(path.lastIndexOf("/") + 1);
}function uidGenerator() {return "-" + parseInt(Math.random() * 10000 + 1, 10);
}const progress: UploadProps["progress"] = {strokeColor: {"0%": "#108ee9","100%": "#87d068"},style: {margin: "15px 0"},strokeWidth: 3,format: percent => `${percent}%`
};// 文件上传分块任务的队列(用于移除文件时,停止该文件的上传队列) key:fileUid value: queue object
const fileUploadChunkQueue = ref({}).value;/*** 获取一个上传任务,没有则初始化一个*/
const getTaskInfo = async (file) => {let task;const identifier = await md5(file);task = await taskInfo(identifier);if (!task) {const initTaskData = {identifier,fileName: file.name,totalSize: file.size,chunkSize: 5 * 1024 * 1024};task = await initTask(initTaskData);}return task;
};/*** 上传逻辑处理,如果文件已经上传完成(完成分块合并操作),则不会进入到此方法中*/
const handleUpload = (file, taskRecord, options) => {let lastUploadedSize = 0; // 上次断点续传时上传的总大小let uploadedSize = 0; // 已上传的大小const totalSize = file.size || 0; // 文件总大小let startMs = new Date().getTime(); // 开始上传的时间const { exitPartList, chunkSize, chunkNum, fileIdentifier } = taskRecord;// 获取从开始上传到现在的平均速度(byte/s)const getSpeed = () => {// 已上传的总大小 - 上次上传的总大小(断点续传)= 本次上传的总大小(byte)const intervalSize = uploadedSize - lastUploadedSize;const nowMs = new Date().getTime();// 时间间隔(s)const intervalTime = (nowMs - startMs) / 1000;return intervalSize / intervalTime;};const uploadNext = async (partNumber: number) => {const start = chunkSize * (partNumber - 1);const end = start + chunkSize;const blob = file.slice(start, end);const preRes = await preSignUrl({identifier: fileIdentifier,partNumber: partNumber});if (preRes) {await axios.request({url: preRes,method: "PUT",data: blob,headers: { "Content-Type": "application/octet-stream" }});return Promise.resolve({ partNumber: partNumber, uploadedSize: blob.size });}return Promise.reject(`分片${partNumber}, 获取上传地址失败`);};/*** 更新上传进度* @param increment 为已上传的进度增加的字节量*/const updateProcess = (increment: number) => {const { onProgress } = options;let factor = 1000; // 每次增加1000 bytelet from = 0;// 通过循环一点一点的增加进度while (from <= increment) {from += factor;uploadedSize += factor;const percent = Math.round(uploadedSize / totalSize * 100).toFixed(2);onProgress({ percent: percent });}const speed = getSpeed();const remainingTime = speed != 0 ? Math.ceil((totalSize - uploadedSize) / speed) + "s" : "未知";console.log("剩余大小:", (totalSize - uploadedSize) / 1024 / 1024, "mb");console.log("当前速度:", (speed / 1024 / 1024).toFixed(2), "mbps");console.log("预计完成:", remainingTime);};return new Promise(resolve => {const failArr: any = [];const queue = Queue(5, {"retry": 3, //Number of retries"retryIsJump": false, //retry now?"workReject": function(reason, queue) {failArr.push(reason);},"queueEnd": function(queue) {resolve(failArr);}});fileUploadChunkQueue[file.uid] = queue;for (let partNumber = 1; partNumber <= chunkNum; partNumber++) {const exitPart = (exitPartList || []).find(exitPart => exitPart.partNumber == partNumber);if (exitPart) {// 分片已上传完成,累计到上传完成的总额中,同时记录一下上次断点上传的大小,用于计算上传速度lastUploadedSize += exitPart.size;updateProcess(exitPart.size);} else {queue.push(() => uploadNext(partNumber).then(res => {// 单片文件上传完成再更新上传进度updateProcess(res.uploadedSize);}));}}if (queue.getLength() == 0) {// 所有分片都上传完,但未合并,直接return出去,进行合并操作resolve(failArr);return;}queue.start();});
};/*** el-upload 自定义上传方法入口*/
const handleHttpRequest = async (options) => {const { onSuccess } = options;const file = options.file;const task = await getTaskInfo(file);if (task) {const { finished, taskRecord } = task;const { fileIdentifier: identifier } = taskRecord;if (finished) {emit("update:modelValue", taskRecord.objectKey);onSuccess(null, file);} else {const errorList: any = await handleUpload(file, taskRecord, options);if (errorList.length > 0) {createMessage.error("部分分片上次失败,请尝试重新上传文件");return;}await merge(identifier);emit("update:modelValue", taskRecord.objectKey);onSuccess(errorList, file);}} else {createMessage.error("文件上传错误");}
};/*** 移除文件列表中的文件* 如果文件存在上传队列任务对象,则停止该队列的任务*/
const handleRemoveFile = (uploadFile, uploadFiles) => {fileList.value = fileList.value.filter(item => item.uid != uploadFile.uid);const queueObject = fileUploadChunkQueue[uploadFile.uid];if (queueObject) {queueObject.stop();fileUploadChunkQueue[uploadFile.uid] = undefined;}emit("update:modelValue", "");
};const handleChange = (info) => {fileList.value = info.fileList;
};
</script><style scoped></style>
2.接口 awss3.ts
import { defHttp } from "/@/utils/http/axios";/*** 根据文件的md5获取未上传完的任务* @param identifier 文件md5*/
export const taskInfo = (identifier) => {return defHttp.get({ url: `/awsS3Upload/task/${identifier}` });
};/*** 初始化一个分片上传任务* @param identifier 文件md5* @param fileName 文件名称* @param totalSize 文件大小* @param chunkSize 分块大小*/
export const initTask = (params) => {return defHttp.post({ url: "/awsS3Upload/task/init", params });
};/*** 获取预签名分片上传地址* @param identifier 文件md5* @param partNumber 分片编号*/
export const preSignUrl = ({ identifier, partNumber }) => {return defHttp.get({ url: `/awsS3Upload/task/${identifier}/${partNumber}` });
};/*** 合并分片* @param identifier*/
export const merge = (identifier) => {return defHttp.post({ url: `/awsS3Upload/task/merge/${identifier}` });
};
3.使用
<AwsS3Upload v-model:modelValue="form.videoId"@update:modelValue="(value)=>{ form.videoId = value }" />