高端建设网站,怎么注册网站,亚马逊网站建设分析,网络营销公司模板目录 一、MinIO快速搭建1.1、拉取docker镜像1.2、启动docker容器 二、分片上传大文件到MinIO2.1、添加依赖2.2、实现MinioClient2.3、实现分片上传2.3.0、初始化MinioClient2.3.1、准备分片上传2.3.2、分片并上传2.3.2.1、设置分片大小2.3.2.2、分片 2.3.3、分片合并 三、测试3… 目录 一、MinIO快速搭建1.1、拉取docker镜像1.2、启动docker容器 二、分片上传大文件到MinIO2.1、添加依赖2.2、实现MinioClient2.3、实现分片上传2.3.0、初始化MinioClient2.3.1、准备分片上传2.3.2、分片并上传2.3.2.1、设置分片大小2.3.2.2、分片 2.3.3、分片合并 三、测试3.1、完整测试代码3.2、运行日志和效果 一、MinIO快速搭建
这里简单介绍一下通过docker方式快速搭建MinIO的大体流程。
1.1、拉取docker镜像
首先直接尝试拉取
docker pull minio/minio如果拉不到试图更改docker镜像源
echo {registry-mirrors: [https://4xxwxhl6.mirror.aliyuncs.com,https://mirror.iscas.ac.cn,https://docker.rainbond.cc,https://docker.nju.edu.cn,https://6kx4zyno.mirror.aliyuncs.com,https://mirror.baidubce.com,https://docker.m.daocloud.io,https://dockerproxy.com]
} | sudo tee /etc/docker/daemon.json /dev/null接着重启docker服务使新配置生效
sudo systemctl restart docker最后再次拉取即可。
1.2、启动docker容器
首先创建配置和数据目录
mkdir -p /opt/minio/config
mkdir -p /opt/minio/data接着启动
docker run -p 9000:9000 -p 9001:9001 --nethost --name minio -d --restartalways -e MINIO_ACCESS_KEYminio -e MINIO_SECRET_KEYminio123 -v /opt/minio/data:/data -v /opt/minio/config:/root/.minio minio/minio server /data --console-address :9001 -address :9000最后进入MinIO控制台http://192.168.2.195:9001简单做点存储桶、用户、用户组等配置即可。比如创建新用户名minioUser密码minioUser123。
二、分片上传大文件到MinIO
2.1、添加依赖
这里需要注意minio 8.3.3必须依赖okhttp的版本不小于4.8.1。
// minio 8.3.3 Must use okhttp 4.8.1
implementation io.minio:minio:8.3.3
implementation com.squareup.okhttp3:okhttp:4.12.02.2、实现MinioClient
参考S3官方文档https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html#mpu-process大文件的分片上传主要分三步实现
initMultiPartUpload创建一个大文件分片上传任务uploadMultiPart逐个上传分片mergeMultipartUpload合并分片
通过继承默认的MinioClient将一些相关的重要方法暴露出来以便使用。
package com.szh.minio;import com.google.common.collect.Multimap;
import io.minio.*;
import io.minio.errors.*;
import io.minio.messages.Part;import java.io.IOException;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;public class CustomMinioClient extends MinioClient {/*** 继承父类*/public CustomMinioClient(MinioClient client) {super(client);}/*** 初始化分片上传即获取uploadId*/public String initMultiPartUpload(String bucket, String region, String object, MultimapString, String headers, MultimapString, String extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException {CreateMultipartUploadResponse response this.createMultipartUpload(bucket, region, object, headers, extraQueryParams);return response.result().uploadId();}/*** 上传单个分片*/public UploadPartResponse uploadMultiPart(String bucket, String region, String object, Object data,long length,String uploadId,int partNumber,MultimapString, String headers,MultimapString, String extraQueryParams) throws IOException, InvalidKeyException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException {return this.uploadPart(bucket, region, object, data, length, uploadId, partNumber, headers, extraQueryParams);}/*** 合并分片*/public ObjectWriteResponse mergeMultipartUpload(String bucketName, String region, String objectName, String uploadId, Part[] parts, MultimapString, String extraHeaders, MultimapString, String extraQueryParams) throws IOException, NoSuchAlgorithmException, InsufficientDataException, ServerException, InternalException, XmlParserException, InvalidResponseException, ErrorResponseException, ServerException, InvalidKeyException {return this.completeMultipartUpload(bucketName, region, objectName, uploadId, parts, extraHeaders, extraQueryParams);}public void cancelMultipartUpload(String bucketName, String region, String objectName, String uploadId, MultimapString, String extraHeaders, MultimapString, String extraQueryParams) throws ServerException, InsufficientDataException, ErrorResponseException, NoSuchAlgorithmException, IOException, InvalidKeyException, XmlParserException, InvalidResponseException, InternalException {this.abortMultipartUpload(bucketName, region, objectName, uploadId, extraHeaders, extraQueryParams);}/*** 查询当前上传后的分片信息*/public ListPartsResponse listMultipart(String bucketName, String region, String objectName, Integer maxParts, Integer partNumberMarker, String uploadId, MultimapString, String extraHeaders, MultimapString, String extraQueryParams) throws NoSuchAlgorithmException, InsufficientDataException, IOException, InvalidKeyException, ServerException, XmlParserException, ErrorResponseException, InternalException, InvalidResponseException {return this.listParts(bucketName, region, objectName, maxParts, partNumberMarker, uploadId, extraHeaders, extraQueryParams);}
}2.3、实现分片上传
2.3.0、初始化MinioClient
连接到minio并确保存储桶的存在。
static CustomMinioClient minioClient new CustomMinioClient(MinioClient.builder().endpoint(http://192.168.2.195:9000).credentials(minioUser, minioUser123).build());
// 测试桶
static String bucketName test;
static {try {boolean found minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());if (!found) {minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());}} catch (Exception e) {throw new RuntimeException(e);}
}2.3.1、准备分片上传
创建一个大文件分片上传任务。
String contentType application/octet-stream;
HashMultimapString, String headers HashMultimap.create();
headers.put(Content-Type, contentType);
String uploadId minioClient.initMultiPartUpload(bucketName, null, file.getName(), headers, null);
System.out.println(uploadId: uploadId);2.3.2、分片并上传
本文是使用纯服务端进行分片和上传而实际项目中更推荐由后端首先调用minio的接口getPresignedObjectUrl逐个生成每个分片的签名后的上传url然后前端直接以此上传到minio即可省去后端服务的网络IO开销。 后者方案请见MinIO分片上传超大文件非纯服务端
2.3.2.1、设置分片大小
一方面需要注意单个分片大小最小5MB如果每个分片设置小于5MB则minio或S3底层在合并时报错code EntityTooSmall, message Your proposed upload is smaller than the minimum allowed object size。
另一方面在调整分片大小时需要注意minio或S3底层允许的分片范围[1,10000]。
2.3.2.2、分片
一方面为了保证分片的效率借助线程池的并发以及RandomAccessFile的文件随机访问能力更快地完成分片的流程。当然可控制并发数和分片大小以防止并发分片中的OOM。
另一方面考虑到分片全部完成之后还有最后的合并操作所以借助CountDownLatch来确保所有分片上传之后再去执行合并。
2.3.3、分片合并
合并所有已上传的分片。
Part[] parts new Part[(int) chunkCount];
// 查询上传后的分片数据。S3最大允许10000且从1开始
ListPartsResponse partResult minioClient.listMultipart(bucketName, null, file.getName(), 10000, 0, uploadId, null, null);
int partNumber 1;
for (Part part : partResult.result().partList()) {parts[partNumber - 1] new Part(partNumber, part.etag());partNumber;
}
ObjectWriteResponse objectWriteResponse minioClient.mergeMultipartUpload(bucketName, null, file.getName(), uploadId, parts, null, null);三、测试
3.1、完整测试代码
package com.szh.minio;import com.google.common.collect.HashMultimap;
import io.minio.*;
import io.minio.messages.Part;
import lombok.Getter;
import lombok.Setter;import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;Setter
Getter
public class MinioMain {
static CustomMinioClient minioClient new CustomMinioClient(MinioClient.builder().endpoint(http://192.168.2.195:9000).credentials(minioUser, minioUser123).build());// 测试桶static String bucketName test;static {try {boolean found minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());if (!found) {minioClient.makeBucket(MakeBucketArgs.builder().bucket(bucketName).build());}} catch (Exception e) {throw new RuntimeException(e);}}// 需要被分片上传的大文件static String filePath C:\\tmp\\psi_result.csv;static File file new File(filePath);// 单个分片大小5MB如果每个分片设置小于5MB则minio或S3底层在合并时报错// code EntityTooSmall, message Your proposed upload is smaller than the minimum allowed object size.static final long CHUNK_SIZE 5 * 1024 * 1024;// 当前分片号minio或S3底层允许的分片范围[1,10000]// https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html#mpu-processprivate int chunkIndex;// 用于得知所有分片都传输成功后的时刻进而进行合并private static CountDownLatch countDownLatch;public static void main(String[] args) throws Exception {// 第一步准备分片上传String contentType application/octet-stream;HashMultimapString, String headers HashMultimap.create();headers.put(Content-Type, contentType);String uploadId minioClient.initMultiPartUpload(bucketName, null, file.getName(), headers, null);System.out.println(uploadId: uploadId);// 第二步分片并上传// ps实际项目中可由后端先getPresignedObjectUrl逐个生成每个分片的签名后的上传url前端直接以此上传到minio即可省去后端服务的网络开销long totalLength file.length();System.out.println(totalLength: totalLength Byte);// 计算分片数量long chunkCount (totalLength CHUNK_SIZE - 1) / CHUNK_SIZE;System.out.println(chunkCount: chunkCount);countDownLatch new CountDownLatch((int) chunkCount);// 5个核心线程并发上传分片ExecutorService fixedThreadPool Executors.newFixedThreadPool(5);for (long i 0; i chunkCount; i) {long position i * CHUNK_SIZE;int bytesRead (int) Math.min(CHUNK_SIZE, totalLength - position);MinioMain minioMain new MinioMain();// S3分片号从1开始minioMain.setChunkIndex((int) i 1);fixedThreadPool.submit(new Runnable() {Overridepublic void run() {try {// 上传分片minioMain.processChunk(filePath, position, bytesRead, uploadId);} catch (Exception e) {throw new RuntimeException(e);}}});}countDownLatch.await();fixedThreadPool.shutdownNow();// 第三步合并分片System.out.println(ready to merge file.getName() - uploadId - bucketName );Part[] parts new Part[(int) chunkCount];// 查询上传后的分片数据。S3最大允许10000且从1开始ListPartsResponse partResult minioClient.listMultipart(bucketName, null, file.getName(), 10000, 0, uploadId, null, null);int partNumber 1;for (Part part : partResult.result().partList()) {parts[partNumber - 1] new Part(partNumber, part.etag());partNumber;}ObjectWriteResponse objectWriteResponse minioClient.mergeMultipartUpload(bucketName, null, file.getName(), uploadId, parts, null, null);System.out.println(mergeMultipartUpload resp etag: objectWriteResponse.etag());StatObjectResponse statObjectResponse minioClient.statObject(StatObjectArgs.builder().bucket(bucketName).object(file.getName()).build());System.out.println(etag: statObjectResponse.etag() size: statObjectResponse.size() lastModified: statObjectResponse.lastModified());}private void processChunk(String filePath, long position, int bytesRead, String uploadId) {// 可控制并发数和分片大小以防止OOMbyte[] buffer new byte[bytesRead];RandomAccessFile raf null;try {int chunkIndex this.getChunkIndex();raf new RandomAccessFile(filePath, r);// 定位到指定位置raf.seek(position);// 读取bytesRead字节长度作为分片raf.readFully(buffer);String contentType application/octet-stream;HashMultimapString, String headers HashMultimap.create();headers.put(Content-Type, contentType);UploadPartResponse uploadPartResponse minioClient.uploadMultiPart(bucketName, null, file.getName(),buffer, bytesRead,uploadId, chunkIndex, headers, null);System.out.println(chunk[ chunkIndex ] buffer size: [ buffer.length Byte] upload etag: [ uploadPartResponse.etag() ]);} catch (Exception e) {e.printStackTrace();} finally {if (raf ! null) {try {raf.close();} catch (IOException e) {e.printStackTrace();}}countDownLatch.countDown();}}
}3.2、运行日志和效果
运行日志如下
uploadId: MzFiMWRmZjctMDg0Yy00YzMyLTk5NTYtMjRkZGZiMDZlYjJhLmUwZmFkNzFiLWEwZTctNDU1Yi04ZWFjLWFhODQyZjBiMmIyOXgxNzI3MzQwMjUzMTA2Njc5MTEz
totalLength: 3576974860 Byte
chunkCount: 683
chunk[1] buffer size: [5242880 Byte] upload etag: [97096e510d1dcda56646608345de08ea]
chunk[3] buffer size: [5242880 Byte] upload etag: [d8102f80f10eb79f600cdf2d378ae8fe]
chunk[4] buffer size: [5242880 Byte] upload etag: [b74f9b8fa2025580b4fc00449c66e271]
chunk[5] buffer size: [5242880 Byte] upload etag: [e77603ee49cc3f7d229f124ecd9a3f38]
chunk[2] buffer size: [5242880 Byte] upload etag: [b148b311ccd2b3fcd4777d56a8758c3d]
chunk[6] buffer size: [5242880 Byte] upload etag: [94abe5a7a2117b612d9805029398cfd9]
chunk[7] buffer size: [5242880 Byte] upload etag: [433b52aed0d1b1486df07a2259932a83]
chunk[8] buffer size: [5242880 Byte] upload etag: [2c242bd205f9b3c4546454fe2d0abef4]
...
chunk[679] buffer size: [5242880 Byte] upload etag: [8492b0573cc74ec55cb6d2a86aee0f69]
chunk[678] buffer size: [5242880 Byte] upload etag: [4aa5c01b4f7aea95952ec62d71ee9996]
chunk[681] buffer size: [5242880 Byte] upload etag: [ac0b739044bfd2644fc8da97fc03a1a9]
chunk[680] buffer size: [5242880 Byte] upload etag: [d95ee210ac774b3ca26e091941c66e20]
chunk[682] buffer size: [5242880 Byte] upload etag: [75e78df64c1fad0839ba8a1583cd93ec]
chunk[683] buffer size: [1330700 Byte] upload etag: [2f30c8d65e23d266c7f10f051854bc6a]
ready to merge psi_result.csv - MzFiMWRmZjctMDg0Yy00YzMyLTk5NTYtMjRkZGZiMDZlYjJhLmUwZmFkNzFiLWEwZTctNDU1Yi04ZWFjLWFhODQyZjBiMmIyOXgxNzI3MzQwMjUzMTA2Njc5MTEz - test
mergeMultipartUpload resp etag: ff6ebd330b3cb224ade84463dd14df82-683
etag: ff6ebd330b3cb224ade84463dd14df82-683 size: 3576974860 lastModified: 2024-09-26T09:09Z上传后的控制台