zoukankan      html  css  js  c++  java
  • Spark2.0 shuffle service

    Spark 的shuffle 服务是spark的核心,本文介绍了非ExternalShuffleClient的方式,看BlockService的整个架构。ShuffleClient是整个框架的基础,有init方法和fetchBlock两个方法。

    /** Provides an interface for reading shuffle files, either from an Executor or external service. */  
    public abstract class ShuffleClient implements Closeable {  
      
      /**  
       * Initializes the ShuffleClient, specifying this Executor's appId.  
       * Must be called before any other method on the ShuffleClient.  
       * 初始化ShuffleClient, 传入本执行器的程序ID,本方法必须在访问ShuffleClient的其它方法前调用。  
       */  
      public void init(String appId) { }  
      
      /**  
       * Fetch a sequence of blocks from a remote node asynchronously,  
       *  
       * Note that this API takes a sequence so the implementation can batch requests, and does not  
       * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as  
       * the data of a block is fetched, rather than waiting for all blocks to be fetched.  
       * 异步的从远程结点取一系列的数据块,并且不返回future对象,所以当取到一个数据块的数据时,底层的实现可以调用onBlockFetchSuccess方法,  
       * 并不用等所有的数据块都取完。  
       */  
      public abstract void fetchBlocks(  
          String host,  
          int port,  
          String execId,  
          String[] blockIds,  
          BlockFetchingListener listener);  
    }  

    BlockFetchingListener接口,onBlockFetchSuccess方法:每次成功取得一个数据块时调用。当本方法返回时,数据必须被自动释放。 如果数据被传递给另一个线程,接收者必须自己调用retain()和release(),或者拷贝数据到一个新的缓冲区。onBlockFetchFailure方法,数据块获取失败时,至少被调用一次。

    public interface BlockFetchingListener extends EventListener {  
      /**  
       * Called once per successfully fetched block. After this call returns, data will be released  
       * automatically. If the data will be passed to another thread, the receiver should retain()  
       * and release() the buffer on their own, or copy the data to a new buffer.  
       */  
      void onBlockFetchSuccess(String blockId, ManagedBuffer data);  
      
      /**  
       * Called at least once per block upon failures.  
       */  
      void onBlockFetchFailure(String blockId, Throwable exception);  
    }  

    BlockTransferService扩展了ShuffleClient,有一些方法的公共的实现。

    private[spark]  
    abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {  
      
      /**  
       * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch  
       * local blocks or put local blocks.  
       * 通过传递给他BlockDataManager对象来初始化传输服务,BlockDataManager可以用来存取本地数据块。  
       */  
      def init(blockDataManager: BlockDataManager): Unit  
      
      /**  
       * Tear down the transfer service.  
       * 关闭传输服务。  
       */  
      def close(): Unit  
      
      /**  
       * Port number the service is listening on, available only after [[init]] is invoked.  
       * 传输服务所在的端口号,在调用init方法后可用。  
       */  
      def port: Int  
      
      /**  
       * Host name the service is listening on, available only after [[init]] is invoked.  
       * 传输服务所在的主机名,在调用init方法后可用。  
       */  
      def hostName: String  
      
      /**  
       * Fetch a sequence of blocks from a remote node asynchronously,  
       * available only after [[init]] is invoked.  
       *  
       * Note that this API takes a sequence so the implementation can batch requests, and does not  
       * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as  
       * the data of a block is fetched, rather than waiting for all blocks to be fetched.  
       *   
       * 异步的从远程结点取一系列的数据块,,仅在调用init方法后可用。  
       * 注意本API用一个序列,所以实现可以使用批量请求,并且不返回future对象,所以当取到一个数据块的数据时,底层的实现可以调用onBlockFetchSuccess方法,  
       * 并不用等所有的数据块都取完。  
     */ 

    override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener): Unit

    /**
      * Upload a single block to a remote node, available only after [[init]] is invoked. * 上传一个数据块到远程结点,仅在调用init方法后可用。
    */ def uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] /** * A special case of [[fetchBlocks]], as it fetches only one block and is blocking. * * It is also only available after [[init]] is invoked. * fetchBlocks的一个特别方法,他只取一个数据块并且阻塞,仅在调用init方法后可用。 。 */ def fetchBlockSync(host: String, port: Int, execId: String, blockId: String): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } }) ThreadUtils.awaitResult(result.future, Duration.Inf) } /** * Upload a single block to a remote node, available only after [[init]] is invoked. * * This method is similar to [[uploadBlock]], except this one blocks the thread * until the upload finishes. * 上传一个数据块到远程结点,仅在调用init方法后可用。 * 这个方法和uploadBlock方法类似,除了直到上传结点,本方法会一直阻塞。 */ def uploadBlockSync( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Unit = { val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) ThreadUtils.awaitResult(future, Duration.Inf) } }

    NettyBlockTransferService扩展了BlockTransferServie

  • 相关阅读:
    IE10标准模式不支持HTC (Html Components) ,升级重写Htc原有代码之一: 自定义属性
    Javascript 数组使用方法总结(转载)
    IE6IE9兼容性问题列表及解决办法_补充之六:锁表头的JQuery方案和非JQuery方案。(不支持IE6,7,8)
    软件国际化总结之一:数字与字符串之间的格式化和转化处理
    测试当前IE浏览器文档模型版本的js代码(使用documenMode)
    IE6IE9兼容性问题列表及解决办法_补充之五:在IE9下, disabled的文本框内容被选中后,其他控件无法获得焦点问题
    《从程序员到软件设计师的标志》(2010/05/10)
    《需求是软件设计师永远的痛》(2010/05/13)
    EOM与软件设计师开场白(2010/5/07)
    《软件设计师与程序员之间的拥抱和摆脱》(20101/05/11)
  • 原文地址:https://www.cnblogs.com/itboys/p/9185113.html
Copyright © 2011-2022 走看看