zoukankan      html  css  js  c++  java
  • Minio 使用.NET + Vue 实现断点续传、秒传

    来源:https://blog.csdn.net/qq_33474360/article/details/110238308

    Minio是什么
    官方解释:
    MinIO 是一个基于Apache License v2.0开源协议的对象存储服务。它兼容亚马逊S3云存储服务接口,非常适合于存储大容量非结构化的数据,例如图片、视频、日志文件、备份数据和容器/虚拟机镜像等,而一个对象文件可以是任意大小,从几kb到最大5T不等。

    说白了Minio就是一个文件管理服务工具,最大支持5个T的文件上传,具体存储机制与细节自行查看官网去研究。

    安装
    Minio服务安装与部署
    虽然官方提供了SDK,但是并不能满足我们的需要,所以我们需要去下载源码,但尽量不去修改源码,以免日后不好升级。

    从此处进入官网中文网站
    从此处进入官网英文网站

    Minio dotnet
    从此处下载
    将该项目中的Minio文件夹及解决方案目录下的Minio.snk、mono_install.sh、netfx.props文件拷贝至你的项目中,并将Minio项目添加到你的项目.
    前端参考
    主要参考该博主的分片上传流程,不用minio是一样的,代码几乎都是用的该博主的,具体的我就不贴了
    点击跳转

    后端流程及代码
    该流程也是根据前端来配合的,主要后端代码是从一个java朋友那里修改过来的,欢迎讨论

    前端发送第一次请求
    校验文件是否传输,以及minio分配uploadId及part[]
    uploadid是minio进行上传分片和合并操作的关键
    part[]是根据你传入的文件totalsize以5M为分割点进行的文件分片生成part[],在上传时part[]中的每一个part只有自己的序号(从1开始),没有任何文件信息,当进行分片上传时,上传对应的分片及序号即可

    逻辑代码
    文件已上传
    如果已经上传,返回skipUpload为true,以此实现秒传。

    /// <summary>
            /// 获取文件信息
            /// </summary>
            /// <param name="identifier"></param>
            /// <param name="totalSize"></param>
            public FileInfoModel GetFileInfo(string identifier, string fileName, long totalSize)
            {       
                // 根据MD5查询数据库此文件是否已上传成功
                // 成功则直接秒传
                using (MEDbContext db = new MEDbContext())
                {
                    // 验证该MD5是否存在
                    var entites = db.SystemFileBusinessEntities.Where(x => x.ObjectName == identifier && x.BucketName == bucketName);
                    if (!entites.Any())
                        return null;
                    // 验证文件名是否发生过变更
                    return FileUpdate(identifier, fileName, totalSize, db);
                }
            }
    
            /// <summary>
            /// 文件是否需要更新
            /// </summary>
            /// <param name="identifier"></param>
            /// <param name="fileName"></param>
            /// <param name="totalSize"></param>
            /// <param name="db"></param>
            /// <returns></returns>
            private static FileInfoModel FileUpdate(string identifier, string fileName, long totalSize, MEDbContext db)
            {
                string title = Path.GetFileNameWithoutExtension(fileName);
                string extension = Path.GetExtension(fileName);
                var entity = db.Set<SystemFileBusinessEntity>().FirstOrDefault(x => x.BucketName == bucketName && x.ObjectName == identifier && x.Title == title && x.Extension == extension);
                if (entity != null)
                {
                    return new FileInfoModel
                    {
                        Id = entity.Id,
                        NeedMerge = false,
                        SkipUpload = true,
                        FileName = fileName,
                    };
                }
                else
                {
                    entity = db.Set<SystemFileBusinessEntity>().Add(new SystemFileBusinessEntity
                    {
                        BucketName = bucketName,
                        CreateTime = DateTime.Now,
                        Extension = extension,
                        ObjectName = identifier,
                        Title = title,
                        UpdateTime = DateTime.Now,
                        Size = totalSize
                    }).Entity;
                    db.SaveChanges();
                    return new FileInfoModel
                    {
                        Id = entity.Id,
                        NeedMerge = false,
                        SkipUpload = true,
                        FileName = fileName,
                    };
                }
            }

    文件未上传

    如果没有上传,返回分片信息uploaded,实现断点续传,如果uploaded为空代表还没传输过该文件

    /// <summary>
            /// 查询分片信息
            /// </summary>
            /// <param name="identifier"></param>
            /// <param name="totalSize"></param>
            /// <param name="fileName"></param>
            /// <returns></returns>
            public async Task<int[]> GePartInfoAsync(string identifier, long totalSize, string fileName)
            {
                // Redis查询已上传分片,返回未上传分片
                var basicInfoIsExist = await _redisHelper.HashExistsAsync("file_" + identifier, "basic_info");
                if (basicInfoIsExist == false)
                {
                    var partDic = await Global.MINIOAPI.MultUploadByStreamAsync(bucketName, identifier, totalSize, null, "application/octet-stream", null);
                    var fileModel = new FileModel();
                    fileModel.UploadId = partDic["uploadId"].ToString();
                    fileModel.Parts = (Part[])partDic["parts"];
                    fileModel.Md5 = identifier;
                    fileModel.Size = totalSize;
                    fileModel.FileName = fileName;
                    await _redisHelper.HashSetAsync("file_" + identifier, "basic_info", fileModel);
                    return new int[] { };
                }
                List<string> keys = await _redisHelper.HashKeysAsync<string>("file_" + identifier + "_part");
                return keys.Select(s => Convert.ToInt32(s)).ToArray();
        }

    minio生成uploadId及parts[]关键代码

    /// <summary>
            /// 在进行多文件上传之前,先初始化,获取uploaderId等信息
            /// </summary>
            /// <param name="bucketName"></param>
            /// <param name="objectName"></param>
            /// <param name="headerMap"></param>
            /// <param name="contentType"></param>
            /// <returns></returns>
            public async Task<string> InitMultUploadAsync(string bucketName, string objectName,
                                         Dictionary<string, string> headerMap, string contentType)
            {
    
                if (headerMap == null)
                {
                    headerMap = new Dictionary<string, string>();
                }
    
                if (contentType == null)
                {
                    if (!headerMap.ContainsKey("Content-Type"))
                    {
                        headerMap.Add("Content-Type", "application/octet-stream");
                    }
                }
                else
                {
                    headerMap.Add("Content-Type", contentType);
                }
    
    
                string uploadId = await this.NewMultipartUploadAsync(bucketName, objectName, new Dictionary<string, string>(), headerMap, default).ConfigureAwait(false);
                return uploadId;
            }
    
            public Part[] MakeMultUpload(long size, Part[] parts)
            {
                /* Multipart upload */
                Part[] totalParts = parts;
                if (totalParts == null)
                {
                    dynamic multiPartInfo = utils.CalculateMultiPartSize(size);
                    double partSize = multiPartInfo.partSize;
                    double partCount = multiPartInfo.partCount;
                    double lastPartSize = multiPartInfo.lastPartSize;
                    totalParts = new Part[(int)partCount];
                    for (int i = 0; i < totalParts.Length; i++)
                    {
                        totalParts[i] = new Part() { PartNumber = i + 1 };
                        if (i != totalParts.Length - 1)
                        {
                            totalParts[i].Size = ((long)partSize);
                        }
                        else
                        {
                            totalParts[i].Size = ((long)lastPartSize);
                        }
                    }
                }
                return totalParts;
            }

    分片上传

    逻辑代码

    等校验完毕后就会根据以5M一片的次数发起post请求上传一个个的分片
    上传成功后将分片信息存入,主要是分片ChunkNumber及Etag

    /// <summary>
            /// 上载分片
            /// </summary>
            /// <returns>是否需要合并</returns>
            public async Task UploadPartAsync(MinioFilePartUpload partData)
            {
                string md5 = partData.Identifier;
                int partNumber = partData.ChunkNumber;
                FileModel fileModel = await _redisHelper.HashGeAsync<FileModel>("file_" + md5, "basic_info");
                if (fileModel == null) // 为空异常(可能传完了)
                    throw new Exception("上传出现异常");
                // 加锁上传()
                string key = "file_" + md5 + "_part";
                string strPartNumber = partNumber.ToString();
    
                string lockKey = "lock_" + key + "_" + strPartNumber;
                string locVlue = strPartNumber;
                try
                {
                    if (_redisHelper.LockTake(lockKey, locVlue, 20))
                    {
                        Console.WriteLine(locVlue + ":上传开始");
                        var isExist = await _redisHelper.HashExistsAsync(key, strPartNumber);
                        if (isExist == false)
                        {
                            long size = partData.UpFile.Length;
                            Part[] parts = await Global.MINIOAPI.MultUploadByStreamAsync(fileModel.UploadId, bucketName, md5, partData.UpFile.OpenReadStream(), size, fileModel.Parts, partNumber);
                            string etag = parts[partNumber - 1].ETag;
                            if (string.IsNullOrEmpty(etag))
                                throw new Exception($"{strPartNumber}:{partData.ChunkNumber}获取文件etag失败");
                            _ = await _redisHelper.HashSetAsync(key, strPartNumber, etag);
                            await _redisHelper.HashIncrementAsync("file_" + md5, "part_count", 1);
                        }
                    }
                }
                finally
                {
                    _redisHelper.LockRelease(lockKey, locVlue);
                }
            }

    minio上传分片关键代码

    此处主要是根据上传分片流获取etag,合并时要根据etag作合并操作

    public async Task<Part[]> MakeMultUploadAsync(string uploadId, string bucketName, string objectName, long expectedReadSize, Object data, Part[] parts, int partNumber)
            {
                /* Multipart upload */
                Part[] totalParts = parts;
                try
                {
                    Stream stream = (Stream)data;
                    byte[] bytes = new byte[stream.Length];
                    await stream.ReadAsync(bytes, 0, bytes.Length);
                   
                    Dictionary<string, string> metaData = new Dictionary<string, string>();
                    metaData["Content-Type"] = "application/octet-stream";
                    var sseHeaders = new Dictionary<string, string>();
                    ServerSideEncryption serverSideEncryption = new  SSES3();
                    serverSideEncryption.Marshal(sseHeaders);
                    string etag = await PutObjectAsync(bucketName, objectName, uploadId, partNumber, bytes, metaData, sseHeaders, default);
                   
                    totalParts[partNumber - 1].ETag = (etag);
                }
                catch (Exception e)
                {
                    //出错后打出异常,继续执行,这样就可以得到出错的卷
                    //totalParts[partNumber - 1].setState(-1);
                    Console.WriteLine(e.StackTrace);
                }
    
                return totalParts;
            }

    合并

    逻辑代码

    /// <summary>
            /// 分片合并
            /// </summary>
            /// <param name="md5"></param>
            /// <param name="fileName"></param>
            /// <returns></returns>
            public async Task Compose(FileCompose fileCompose)
            {
                string identifier = fileCompose.Identifier;
                string key = "file_" + identifier;
                var fileModel = await _redisHelper.HashGeAsync<FileModel>("file_" + identifier, "basic_info");
                if (fileModel == null) // 为空代表异常(可能传完了)
                    throw new Exception("上传出现异常");
                try
                {
                    if (!_redisHelper.LockTake("lock" + key, key, 20))
                    {
                        Console.WriteLine("该锁已被使用");
                        return;
                    }
                    string cutStr = await _redisHelper.HashGeAsync(key, "part_count");
                    string fCoutStr = fileModel.Parts.Length.ToString();
                    while (cutStr != fCoutStr)
                    {
                        await Task.Delay(200);
                        cutStr = await _redisHelper.HashGeAsync(key, "part_count");
                    }
    
                    using (MEDbContext db = new MEDbContext())
                    {
                        // 查询数据库中是否已存入信息
                        var entity = db.Set<SystemFileBusinessEntity>().FirstOrDefault(x => x.BucketName == bucketName && x.ObjectName == identifier);
                        if (entity == null)
                        {
                            Part[] parts = new Part[fileModel.Parts.Length];
                            for (int i = 0; i < fileModel.Parts.Length; i++)
                            {
                                parts[i] = fileModel.Parts[i];
                                parts[i].ETag = await _redisHelper.HashGeAsync("file_" + identifier + "_part", fileModel.Parts[i].PartNumber.ToString());
                            }
                            await Global.MINIOAPI.CommitMultUploadAsync(fileModel.UploadId, bucketName, identifier, parts);
                            _redisHelper.KeyDelete(new string[] { "file_" + identifier, "file_" + identifier + "_part" }.ToList());
                        }
                        FileUpdate(identifier, fileModel.FileName, fileModel.Size, db);
                    }
                }
                finally
                {
                    _redisHelper.LockRelease("lock" + key, key);
                }
            }

    minio

    public Task CommitMultUploadAsync(string uploadId, string bucketName, string objectName, Part[] parts)
            {
                Dictionary<int, string> etags = new Dictionary<int, string>();
                for (int partNumber = 1; partNumber <= parts.Length; partNumber++)
                {
                    etags[partNumber] = parts[partNumber - 1].ETag;
                }
                //this.Secure = true;
                Dictionary<string, string> metaData = new Dictionary<string, string>();
                metaData["Content-Type"] = "application/octet-stream";
                var sseHeaders = new Dictionary<string, string>();
                ServerSideEncryption serverSideEncryption = new SSES3();
                serverSideEncryption.Marshal(sseHeaders);
                return this.CompleteMultipartUploadAsync(bucketName, objectName, uploadId, etags, sseHeaders, default);
            }
    
            // 如果需要调用MD5SUM进行文件校验的话就加上 Content-MD5 Header,否则调用原来Minio的方法即可,不用新加重写
            private async Task CompleteMultipartUploadAsync(string bucketName, string objectName, string uploadId, Dictionary<int, string> etags, Dictionary<string, string> meta, CancellationToken cancellationToken)
            {
                //this.Secure = true;
                var request = await this.CreateRequest(Method.POST, bucketName,
                                                         objectName: objectName,
                                                         headerMap: meta)
                                        .ConfigureAwait(false);
                request.AddQueryParameter("uploadId", $"{uploadId}");
    
                List<XElement> parts = new List<XElement>();
    
                for (int i = 1; i <= etags.Count; i++)
                {
                    parts.Add(new XElement("Part",
                                           new XElement("PartNumber", i),
                                           new XElement("ETag", etags[i])));
                }
    
                var completeMultipartUploadXml = new XElement("CompleteMultipartUpload", parts);
                var bodyString = completeMultipartUploadXml.ToString();
                var body = System.Text.Encoding.UTF8.GetBytes(bodyString);
    
                request.AddParameter("application/xml", body, ParameterType.RequestBody);
                
                //var md5 = MD5.Create();
                //byte[] hash = md5.ComputeHash(body);
                //string base64 = Convert.ToBase64String(hash);
                //request.AddOrUpdateParameter("Content-MD5", base64, ParameterType.HttpHeader);
                var response = await this.ExecuteTaskAsync(this.NoErrorHandlers, request, cancellationToken).ConfigureAwait(false);
            }

    前端注意:

    加上此属性 forceChunkSize: true, // 强制每片都小于分片大小
    在进行校验MD5的时候将暂停按钮隐藏,否则会出现上传时传递的不是MD5的情况
    参考资料:
    https://blog.csdn.net/lmlm21/article/details/107768581
    https://blog.csdn.net/anxyh_name/article/details/108397774
    https://www.cnblogs.com/xiahj/p/vue-simple-uploader.html
    ————————————————
    版权声明:本文为CSDN博主「洋洋洒洒魏先生」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_33474360/article/details/110238308

  • 相关阅读:
    定位IO瓶颈的方法,iowait低,IO就没有到瓶颈?
    10分钟检查自己的系统性能数据
    netperf使用指南
    如何看内核源码
    xxx
    os.path 模块
    目前中国智能语音产业的格局、现状
    NLP-python 自然语言处理01
    15本经典金融投资著作
    写给步入工作的自己
  • 原文地址:https://www.cnblogs.com/yibinboy/p/14498097.html
Copyright © 2011-2022 走看看