zoukankan      html  css  js  c++  java
  • Hadoop fs -put bandwidth 暴力版

    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *    http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    // scalastyle:off println
    package com.weibo.tools
    
    import java.io.{BufferedInputStream,FileInputStream}
    import java.net.URI
    import java.io.BufferedInputStream
    import java.util.concurrent.TimeUnit
    
    import org.apache.hadoop.conf.{Configuration => hdfsConfig}
    import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
    import org.apache.hadoop.io.IOUtils
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Bandwidthlimited_local2HDFS_Writer {
      val kiloByte = 1024
      def upload_one_buffer(inStream : java.io.BufferedInputStream,
        outputStream : org.apache.hadoop.fs.FSDataOutputStream,
        log_buffer : Array[Byte],
        pre_buffer_sum : Long,
        totalSize : Long
      ) : Long = {
        val readSize = inStream.read(log_buffer) 
        val buffer_sum = pre_buffer_sum + readSize
        outputStream.write(log_buffer.splitAt(readSize)._1)
        outputStream.flush
        TimeUnit.MILLISECONDS.sleep(999)
        // println(s"${inStream} uploading. ${buffer_sum} uploaded. readSize : ${readSize}. ${buffer_sum * 100 / totalSize}% finished. ")
        buffer_sum
      }
      def LocalLog2HDFS_Writer(sc : SparkContext, 
        localSrcPath : String, 
        remoteTarPath : String,
        bandwidth : String
      ) : Long = {
        sc.hadoopConfiguration.setBoolean("dfs.support.append",true)
        val hdfs = FileSystem.get(new URI("/"), sc.hadoopConfiguration)
        val filePath = new Path(remoteTarPath)
        val inStream = new BufferedInputStream(new FileInputStream(localSrcPath))
        val totalSize = inStream.available
        hdfs.exists(filePath) match {
          case false => hdfs.create(filePath).close
          case true => println(hdfs.getFileStatus(filePath).toString)
        }
        val outputStream = hdfs.append(filePath)
        val buffer_size = kiloByte * bandwidth.toInt
        val log_buffer = new Array[Byte](buffer_size)
        var buffer_sum = 0L
        try {
            while(inStream.available >= buffer_size) {
              val readSize = inStream.read(log_buffer) 
              buffer_sum += readSize
              outputStream.write(log_buffer.splitAt(readSize)._1)
              outputStream.flush
              outputStream.hflush
              println(s"${localSrcPath} uploading. ${buffer_sum} uploaded. readSize : ${readSize}. ${buffer_sum * 100 / totalSize}% finished. ")
              TimeUnit.MILLISECONDS.sleep(999)
            }
            if(inStream.available > 0) {
              val readSize = inStream.read(log_buffer) 
              buffer_sum += readSize
              outputStream.write(log_buffer.splitAt(readSize)._1)
              outputStream.flush
              println(s"${localSrcPath} uploading. ${buffer_sum} uploaded. readSize : ${readSize}. ${buffer_sum * 100 / totalSize}% finished. ")
            }
          } finally {
            inStream.close
            outputStream.close
          }  
          buffer_sum
      }
      def Local2HDFS_Writer(sc : SparkContext, args: Array[String]) : Long = {
        val helper_info = """    the file localSrcPath pointed limited 1.999G
        Bandwidthlimited_local2HDFS_Writer localSrcPath remoteTarPath bandwidth=10K(by KB)"""
        println(helper_info)
        require(args.size >= 3, helper_info)
        val localSrcPath = args(0)
        val remoteTarPath = args(1)
        val bandwidth = args(2)
        LocalLog2HDFS_Writer(sc, localSrcPath, remoteTarPath, bandwidth)
      }
      def LocalLogReducer2HDFS(sc : SparkContext, taskList : List[(String, String)], bandwidth : String) : Int = {
        var sum = 0
        taskList.iterator.map{
          case (localSrcPath, remoteTarPath) =>
          LocalLog2HDFS_Writer(sc, localSrcPath, remoteTarPath, bandwidth) 
          sum += 1
        }
        sum
      }
      def LocalLogReducer(sc : SparkContext, srcParentPath : String, bandwidth : String) = {}
    
      def main(args: Array[String]) {
    
        val conf = new SparkConf()
          .setAppName("Bandwidthlimited_local2HDFS_Writer")
          .setMaster("local[1]")
        val sc = new SparkContext(conf)
        Local2HDFS_Writer(sc, args)
        sc.stop()
      }
    }

    https://github.com/Suanec/Betn_repo/blob/32d56acd3b57efc15573389619ed7793efdf298c/joyCodes/assembly_lib/src/main/scala/Bandwidthlimited_local2HDFS_Writer.scala

    暴力破解版,为了优先实现功能,利用Spark + Scala依托于Hadoop API,实现了一个上传限速的功能。存在的问题:

    1. hdfs 官方说append本身是不安全的,不建议使用在生产环境中。

    2. 限制网速是通过限制流的读写来实现的,可能会出现网速震荡,但平均值符合预期。

    3. 网速限制以KB为单位,请留意。

    4. 文件大小受限于读入流的问题,目前仅能保证1.999G文件正常使用,超过后可能出现,进度监控失败,重复上传,乱码等问题。

  • 相关阅读:
    2013上半年学习目录
    《linux c编程指南》学习手记4
    Oracle二三事之 Oracle SPARC SuperCluster的九大技术优势
    《linux c编程指南》学习手记5
    Oracle二三事之 数据迁移注意事项
    《linux c编程指南》学习手记3
    在IIS中实现JSP
    为什么匿名内部类参数必须为final类型
    sql server和mysql变量赋值的区别 以及 MySql Declare
    android上传文件到服务器
  • 原文地址:https://www.cnblogs.com/suanec/p/8463383.html
Copyright © 2011-2022 走看看