zoukankan      html  css  js  c++  java
  • s3对象存储

    bkstorages 模块帮助你在蓝鲸应用中使用多种文件存储服务作为后端,用于加速静态资源,管理用户上传文件。

    自定静态文件 storage

    如果通过修改配置文件满足不了你的需求,你随时可以通过继承 RGWBoto3Storage 的方式来自定义你自己的 storage:

    class MyStaticRGWBoto3Storage(RGWBoto3Storage):
    """My Storage class for storing static files
    """
    bucket_name = 'another_bucket'
    location = '/my_static_path'
    object_parameters = {
    'Expires': 'Wed, 30 Nov 2016 04:12:29 GMT',
    'CacheControl': 'max-age=86400'
    }


    # 修改 settings
    STATICFILES_STORAGE = 'custom_backend.MyStaticRGWBoto3Storage'


    除了将 RGWBoto3Storage 指定为文件存储后端外,你还可以通过 RGWBoto3Storage API 来手动使用它来管理文件。

    上传内容文件:

    from bkstorages.backends.rgw import RGWBoto3Storage

    storage = RGWBoto3Storage()

    # 使用 ContentFile
    f = ContentFile('Hello, RGW!')
    storage.save(u'/test/hello', f)
    上传文件对象:

    from tempfile import NamedTemporaryFile

    from django.core.files import File


    with NamedTemporaryFile() as fp:
    fp.write('Temp file')
    fp.flush()

    f = File(fp)
    storage.save(u'/test/temp_file.txt', f)
    查看文件链接:

    storage.url('/test/temp_file.txt')
    列出目录下所有文件:

    storage.listdir('/test')
    删除文件:

    storage.delete('/test/temp_file.txt')




    from bkstorages.backends.rgw import RGWBoto3Storage
    storage = RGWBoto3Storage()
    def task_file_upload(request):
        random_str = ''.join(random.sample(string.ascii_lowercase, 8))
        upload_dir = "%s/task_data/tmp/%s" % (FileUploadDir, random_str)
        response_dic = {'files': {}}
        user = request.COOKIES.get('bk_uid')
        l, l1, l2 = [], [], []
        l.append(user)
        l.sort(cmp)
        user_list = Users.objects.values('username')
        for i in user_list:
            username = i.get('username')
            l1.append(username)
        l1.sort(cmp)
        a = list(set(l) - set(l1))
        if a:
            Users(username=a[0]).save()
        user_name = Users.objects.get(username=user)
        for k, file_obj in request.FILES.items():
            filename = '%s/%s' % (upload_dir, file_obj.name)
            with NamedTemporaryFile() as destination:
                for chunk in file_obj.chunks():
                    destination.write(chunk)
                f = File(destination)
                storage.save(filename, f)
            size = storage.size(filename)
            ctime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
            FileList(file_name=file_obj.name, file_size=size, file_create_time=ctime, user=user_name, files_dir=random_str).save()
        filelist = FileList.objects.filter(user=user_name)
        for file in filelist:
            response_dic['files'][file.id] = {'file_name': file.file_name, 'user': file.user.username, 'file_create_time': file.file_create_time, 'file_size': file.file_size, 'files_dir': file.files_dir}
        return HttpResponse(json.dumps(response_dic))
    
    def delete_file(request, random_str):
        response, file_abs = {}, ''
        if request.method == "POST":
            upload_dir = "%s/task_data/tmp/%s" % (FileUploadDir, random_str)
            filename = request.POST.get('filename')
            file_id = request.POST.get('fileid')
            if filename and file_id:
                file_abs = "%s/%s" % (upload_dir, filename.strip())
            if storage.exists(file_abs):
                storage.delete(file_abs)
                FileList.objects.filter(id=file_id).delete()
                response['msg'] = "file '%s' got deleted " % filename
            else:
                response["error"] = "file '%s' does not exist on server" % filename
        return HttpResponse(json.dumps(response))
    
    def send_zipfile(request, file_path, file_name):
        zip_file_name = '%s_files' % file_name
        archive = zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED)
        file__name = file_name.encode('utf-8')
        filepath = storage.url('%s/%s' % (file_path, file__name.strip()))
        file_path = urllib.unquote(filepath.encode('utf-8'))
        bash('curl -O %s' % file_path)
        if os.path.isfile(file__name):
            archive.write(file__name, arcname=file__name)
            archive.close()
        wrapper = FileWrapper(open(zip_file_name, 'rb'))
        response = HttpResponse(wrapper, content_type='application/zip')
        response['Content-Disposition'] = 'attachment; filename=%s.zip' % (urlquote(zip_file_name))
        response['Content-Length'] = os.path.getsize(zip_file_name)
        return response
    
    
    def file_download(request, random_str):
        file_path, file_name = '', ''
        if request.method == "POST":
            file_path = "%s/task_data/tmp/%s" % (FileUploadDir, random_str)
            file_name = request.POST.get('filename')
        return send_zipfile(request, file_path, file_name)
  • 相关阅读:
    升级CentOS内核
    npm、component、spm、bower的区别
    Bower的使用
    Mac安装Bower
    Bower是什么?
    Mac下安装与配置Go语言开发环境
    jenkins升级为2.134
    nexus实现从windows迁移至Linux平台
    Jenkins构建完成后实现自动将war包部署到指定服务器
    使用jenkins构建一个maven项目
  • 原文地址:https://www.cnblogs.com/muzinan110/p/7573969.html
Copyright © 2011-2022 走看看