zoukankan      html  css  js  c++  java
  • ceph储存的S3接口实现(支持断点续传)

    最近公司准备接ceph储存,研究了一番,准备用亚马逊的s3接口实现,实现类如下:

    /**
     * Title:        S3Manager
     * Description:  Ceph储存的s3接口实现,参考文档:
     * https://docs.aws.amazon.com/zh_cn/AmazonS3/latest/dev/RetrievingObjectUsingJava.html
     * http://docs.ceph.org.cn/radosgw/s3/
     * author:       xu jun
     * date:         2018/10/22
     */
    @Slf4j
    @Service
    public class S3Manager extends StorageManagerBase implements StorageManager {
        private final UKID ukid;
        private final S3ClientConfig s3ClientConfig;
        private final RedisManage redisManage;
        private AmazonS3 amazonClient;
    
        @Autowired
        public S3Manager(UKID ukid, S3ClientConfig s3ClientConfig, RedisManage redisManage) {
            this.ukid = ukid;
            this.s3ClientConfig = s3ClientConfig;
            this.redisManage = redisManage;
        }
    
        private AmazonS3 getAmazonClient() {
            if (amazonClient == null) {
                String accessKey = s3ClientConfig.getAccessKey();
                String secretKey = s3ClientConfig.getSecretKey();
                String endpoint = s3ClientConfig.getEndPoint();
    
                AWSCredentials credentials = new BasicAWSCredentials(accessKey, secretKey);
                ClientConfiguration clientConfig = new ClientConfiguration();
                clientConfig.setProtocol(Protocol.HTTP);
    
                AmazonS3 conn = AmazonS3ClientBuilder.standard()
                        .withClientConfiguration(clientConfig)
                        .withCredentials(new AWSStaticCredentialsProvider(credentials))
                        .withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpoint, ""))
                        .withPathStyleAccessEnabled(true)
                        .build();
    
                //检查储存空间是否创建
                checkBucket(conn);
                amazonClient = conn;
            }
            return amazonClient;
        }
    
        @Override
        public String uploadFile(byte[] fileData, String extension) {
            log.info("Storage s3 api, upload file start");
    
            //生成上传文件的随机序号
            long fileId = ukid.getGeneratorID();
            String fileName = Long.toString(fileId);
            //储存空间名
            String bucketName = s3ClientConfig.getBucketName();
            AmazonS3 conn = getAmazonClient();
    
            PutObjectResult result = conn.putObject(bucketName, fileName, new ByteArrayInputStream(fileData), null);
            log.info("Storage s3 api, put object result :{}", result);
    
            log.info("Storage s3 api, upload file end, file name:" + fileName);
            return fileName;
        }
    
        @Override
        public String uploadAppenderFile(byte[] fileData, String extension) {
            log.info("Storage s3 api, upload appender file start");
    
            //生成上传文件的随机序号
            long ukId = ukid.getGeneratorID();
            String fileName = Long.toString(ukId);
            //储存空间名
            String bucketName = s3ClientConfig.getBucketName();
            AmazonS3 conn = getAmazonClient();
            List<PartETag> partETags = new ArrayList<>();
            //初始化分片上传
            InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest(bucketName, fileName);
            InitiateMultipartUploadResult initResponse = conn.initiateMultipartUpload(initRequest);
            String uploadId = initResponse.getUploadId();
    
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(fileData);
            Integer contentLength = fileData.length;
            // 文件上传
            UploadPartRequest uploadPartRequest = new UploadPartRequest()
                    .withBucketName(bucketName)
                    .withKey(fileName)
                    .withUploadId(uploadId)
                    .withPartNumber(1)
                    .withPartSize(contentLength)
                    .withInputStream(byteArrayInputStream);
            UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest);
    
            try {
                byteArrayInputStream.close();
            } catch (IOException e) {
                throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
            }
            partETags.add(uploadPartResult.getPartETag());
            Integer partNumber = uploadPartResult.getPartNumber();
    
            S3CacheMode cacheMode = new S3CacheMode();
            cacheMode.setPartETags(partETags);
            cacheMode.setPartNumber(partNumber);
            cacheMode.setUploadId(uploadId);
            redisManage.set(fileName, cacheMode);
    
            log.info("Storage s3 api, upload appender file end, fileName: {}", fileName);
            return fileName;
        }
    
        @Override
        public void uploadChunkFile(ChunkFileSaveParams chunkFileSaveParams) {
            log.info("Storage s3 api, upload chunk file start");
    
            String fileName = chunkFileSaveParams.getFileAddress();
            Result result = redisManage.get(fileName);
            JSONObject jsonObject = (JSONObject) result.getData();
            if (jsonObject == null) {
                throw FileCenterExceptionConstants.CACHE_DATA_NOT_EXIST;
            }
            S3CacheMode cacheMode = jsonObject.toJavaObject(S3CacheMode.class);
            Integer partNumber = cacheMode.partNumber;
            String uploadId = cacheMode.getUploadId();
            List<PartETag> partETags = cacheMode.partETags;
    
            //储存空间名
            String bucketName = s3ClientConfig.getBucketName();
            AmazonS3 conn = getAmazonClient();
            ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(chunkFileSaveParams.getBytes());
            Integer contentLength = chunkFileSaveParams.getBytes().length;
    
            UploadPartRequest uploadPartRequest = new UploadPartRequest()
                    .withBucketName(bucketName)
                    .withKey(fileName)
                    .withUploadId(uploadId)
                    .withPartNumber(partNumber + 1)
                    .withPartSize(contentLength)
                    .withInputStream(byteArrayInputStream);
    
            UploadPartResult uploadPartResult = conn.uploadPart(uploadPartRequest);
            partETags.add(uploadPartResult.getPartETag());
            partNumber = uploadPartResult.getPartNumber();
    
            try {
                byteArrayInputStream.close();
            } catch (IOException e) {
                throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
            }
    
            S3CacheMode cacheModeUpdate = new S3CacheMode();
            cacheModeUpdate.setPartETags(partETags);
            cacheModeUpdate.setPartNumber(partNumber);
            cacheModeUpdate.setUploadId(uploadId);
            redisManage.set(fileName, cacheModeUpdate);
    
            if (chunkFileSaveParams.getChunk().equals(chunkFileSaveParams.getChunks() - 1)) {
                //完成分片上传,生成储存对象
                CompleteMultipartUploadRequest compRequest = new CompleteMultipartUploadRequest(bucketName, fileName,
                        uploadId, partETags);
                conn.completeMultipartUpload(compRequest);
            }
    
            log.info("Storage s3 api, upload chunk file end");
        }
    
        @Override
        public byte[] downloadFile(String fileName) {
            log.info("Storage s3 api, download file start");
            //储存空间名
            String bucketName = s3ClientConfig.getBucketName();
            AmazonS3 conn = getAmazonClient();
            S3Object object;
            if (conn.doesObjectExist(bucketName, fileName)) {
                object = conn.getObject(bucketName, fileName);
            } else {
                throw FileCenterExceptionConstants.OBJECT_NOT_EXIST;
            }
            log.debug("Storage s3 api, get object result :{}", object);
    
            byte[] fileByte;
            InputStream inputStream;
            inputStream = object.getObjectContent();
            try {
                fileByte = IOUtils.toByteArray(inputStream);
                inputStream.close();
            } catch (IOException e) {
                throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
            } finally {
                if (inputStream != null) {
                    try {
                        inputStream.close();
                    } catch (IOException e) {
                        log.error(e.getMessage());
                    }
                }
            }
            log.info("Storage s3 api, download file end");
            return fileByte;
        }
    
        @Override
        public byte[] downloadFile(String fileName, long fileOffset, long fileSize) {
            log.info("Storage s3 api, download file by block start");
            //储存空间名
            String bucketName = s3ClientConfig.getBucketName();
            AmazonS3 conn = getAmazonClient();
            S3Object object;
            if (conn.doesObjectExist(bucketName, fileName)) {
                GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, fileName)
                        .withRange(fileOffset, fileOffset + fileSize);
                //范围下载。
                object = conn.getObject(getObjectRequest);
            } else {
                throw FileCenterExceptionConstants.OBJECT_NOT_EXIST;
            }
            log.info("Storage s3 api, get object result :{}", object);
    
            // 读取数据。
            byte[] buf;
            InputStream in = object.getObjectContent();
            try {
                buf = inputToByte(in, (int) fileSize);
            } catch (IOException e) {
                throw FileCenterExceptionConstants.INTERNAL_IO_EXCEPTION;
            } finally {
                try {
                    in.close();
                } catch (IOException e) {
                    log.error(e.getMessage());
                }
            }
            log.info("Storage s3 api, download file by block end");
            return buf;
        }
    
        @Override
        public String fileSecret(String filePath) {
            return null;
        }
    
        @Override
        public String fileDecrypt(String filePath) {
            return null;
        }
    
        @Override
        public String getDomain() {
            return null;
        }
    
    
        /**
         * 检查储存空间是否已创建
         *
         * @param conn 客户端连接
         */
        private void checkBucket(AmazonS3 conn) {
            //储存空间名
            String bucketName = s3ClientConfig.getBucketName();
            if (conn.doesBucketExist(bucketName)) {
                log.debug("Storage s3 api, bucketName is found: " + bucketName);
            } else {
                log.warn("Storage s3 api, bucketName is not exist, create it: " + bucketName);
                conn.createBucket(bucketName);
            }
        }
    
        /**
         * inputStream转byte[]
         *
         * @param inStream 输入
         * @param fileSize 文件大小
         * @return 输出
         * @throws IOException 异常
         */
        private static byte[] inputToByte(InputStream inStream, int fileSize) throws IOException {
            ByteArrayOutputStream swapStream = new ByteArrayOutputStream();
            byte[] buff = new byte[fileSize];
            int rc;
            while ((rc = inStream.read(buff, 0, fileSize)) > 0) {
                swapStream.write(buff, 0, rc);
            }
            return swapStream.toByteArray();
        }
    
        /**
         * 调试用的方法,可以在控制台看到io的数据
         *
         * @param input 输入
         * @throws IOException 异常
        private static void displayTextInputStream(InputStream input) throws IOException {
            // Read the text input stream one line at a time and display each line.
            BufferedReader reader = new BufferedReader(new InputStreamReader(input));
            String line;
            while ((line = reader.readLine()) != null) {
                log.info(line);
            }
        }
         */
    }

    业务接口要实现包括分片上传(支持断点续传)、分片下载等功能,上面类是底层类不包含业务逻辑。

    maven依赖:

            <!-- ceph储存的接口 -->
            <dependency>
                <groupId>com.amazonaws</groupId>
                <artifactId>aws-java-sdk</artifactId>
                <version>1.11.433</version>
            </dependency>

    开发感受:

      1.ceph官网上提供的s3接口文档(java版),内容又少又旧,已经基本不能当做参考了。所以API和代码示例要去亚马逊官网上看(提供了中文版,好评)

      2.s3接口本身不提供文件追加储存的功能。所以在实现分片上传的时候,比较麻烦(不想fastDFS和OSS那么方便)

      3.分片上传默认最小限制是5M,要修改可以在服务器配置上做

      4.如果使用域名做端点的话,默认会把bucket的名字,作为子域名来访问(需要域名解析,所以不建议)。如果想作为路径来访问,需要在连接配置中指定。

  • 相关阅读:
    Flink:What is stream processing?
    Flink1.10.1集成Hadoop3.0.0源码编译实战
    2003-Can't connect to Mysql on '主机名'(10061)
    Mybatis:Tag name expected
    谷歌浏览器安装json格式化插件
    kafka最佳实践:Kafka Best Practices
    kafka生产者性能监控:Monitor Kafka Producer for Performance
    kafka2.3性能测试:Kafka 2.3 Performance testing
    Tomcat 8 Invalid character found in the request target. The valid characters are defined in RFC 3986
    Springboot集成Mybatis、JPA
  • 原文地址:https://www.cnblogs.com/xujanus/p/9851604.html
Copyright © 2011-2022 走看看