zoukankan      html  css  js  c++  java
  • Oss文件存储

     

    包含文件的上传下载和生成临时的url

    # -*- coding: utf-8 -*-
    
    import os
    import oss2
    import configparser
    from Config import *
    
    class AliOss:
        def __init__(self):
            # 读取配置文件
            self._cf = configparser.ConfigParser()
            self._cf.read(ConfigPath)
    
            # 读取环境变量或定义常量
            self._access_key_id = os.getenv('OSS_TEST_ACCESS_KEY_ID', self._cf.get("oss","OSS_TEST_ACCESS_KEY_ID"))
            self._access_key_secret = os.getenv('OSS_TEST_ACCESS_KEY_SECRET', self._cf.get("oss","OSS_TEST_ACCESS_KEY_SECRET"))
            self._bucket_name = os.getenv('OSS_TEST_BUCKET', self._cf.get("oss","OSS_TEST_BUCKET"))
            self._endpoint = os.getenv('OSS_TEST_ENDPOINT', self._cf.get("oss","OSS_TEST_ENDPOINT"))
            # self._filepath = self._cf.get("oss","FILEPATH")
            
            # OSS认证信息
            auth = oss2.Auth(self._access_key_id, self._access_key_secret)
    
            # 创建Bucket对象,所有Object相关的接口都可以通过Bucket对象来进行
            self._bucket = oss2.Bucket(auth, self._endpoint, self._bucket_name)
    
        # 上传文件至oss
        #
        # [Params]
        # from_file: 上传对象文件
        # to_oss: 上传至oss的文件名
        # remove_from_file: 是否删除原文件
        #
        async def upload(self, from_file, to_oss=None, remove_from_file=False):
            # 上传oss的文件名未指定,则默认为当前文件名
            oss_file_name = os.path.basename(from_file) if to_oss is None else to_oss
            self._bucket.put_object_from_file(oss_file_name, from_file)
            
            # 删除原文件
            if remove_from_file:
                os.remove(from_file)
            
            return True
    
        # 下载文件
        #
        # [Params]
        # from_oss: 上传对象文件
        # out_path: 上传至oss的文件名
        #
        def download(self, from_oss, out_path):
            # 文件不存在返回False
            if not self.exists(from_oss):
                return False
    
            # 未指定输出路径,默认为当前路径下保存
            out = from_oss if out_path is None else out_path
            self._bucket.get_object_to_file(from_oss, out)
            
            return True
    
        # Stream
        #
        # [Params]
        # oss_key: 上传对象Key
        # oss_value: 上传内容
        #
        def put(self, oss_key, oss_value):
            self._bucket.put_object(oss_key, oss_value)
            return True
    
        # Stream
        #
        # [Params]
        # oss_key: 上传对象Key
        #
        def stream(self, oss_key):
            # oss对象不存在则返回None
            if not self.exists(oss_key):
                return None
    
            # 返回流式对象
            return self._bucket.get_object(oss_key)
    
    
        # 文件是否存在
        #
        # [Params]
        # oss_file_name: oss的文件名
        #
        def exists(self, oss_file_name):
            return self._bucket.object_exists(oss_file_name)
            
    
        # 删除文件
        #
        # [Params]
        # oss_file_name: oss的文件名
        #
        async def delete(self, oss_file_name):
            self._bucket.delete_object(oss_file_name)
            
            return True
    
        def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60):
            result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime)
            result.replace("-internal","")
            return result

    对上面的函数进行解释和说明

    上传文件

    self._bucket.put_object_from_file(oss_file_name, from_file) 接受的是本地的文件名称
    可以这样使用
    bucket.put_object_from_file('./example.jpg','./example.jpg')
    

      

    下载文件

    out = from_oss if out_path is None else out_path
    self._bucket.get_object_to_file(from_oss, out)

    第一个参数是fromOss,第二个参数是本地的输出路径

    bucket.get_object_to_file('example.jpg', 'example2.jpg')
    

      

    上传文件(以流的形式)

    self._bucket.put_object(oss_key, oss_value)
    需要打开的操作
    with open(oss2.to_unicode('本地座右铭.txt'), 'rb') as f:
        bucket.put_object('云上座右铭.txt', f)
    

    下载文件(以流的形式)

    def stream(self, oss_key):
    # oss对象不存在则返回None
    if not self.exists(oss_key):
    return None

    # 返回流式对象
    return self._bucket.get_object(oss_key)
    直接去指定oss文件上的文件名称
    result=bucket.get_object('5a055c56705deb3b31d3bcab.json')
    print(result.read())
    

    判断oss上是否存在这个文件 

    直接把需要判断的文件名当参数进行判断即可 

    def exists(self, oss_file_name):
        return self._bucket.object_exists(oss_file_name)

    删除oss上的文件

    async def delete(self, oss_file_name):
        self._bucket.delete_object(oss_file_name)
    

    生成一个临时的签名url供客户端下载

    bucket.sign_url('请求的方法get或post', 'oss上的文件名', '过期的时间')
        def signedUrl(self, oss_file_name, HTTP_METHOD='GET', expiredTime=60):
            result = self._bucket.sign_url(HTTP_METHOD, oss_file_name, expiredTime)
            result.replace("-internal","")
            return result



  • 相关阅读:
    JDK的KeyTool和KeyStore等加密相关
    关于分布式事务的随笔[待续]
    Netty实例几则
    Disruptor快速入门
    Java获取系统环境信息
    JDK的BIO, NIO, AIO
    四种常用IO模型
    JDK的多线程与并发库
    递归转换为迭代的一种通用方式
    二叉树的java实现
  • 原文地址:https://www.cnblogs.com/crazymagic/p/8926370.html
Copyright © 2011-2022 走看看