zoukankan      html  css  js  c++  java
  • spark通信原理

    https://github.com/apache/spark/tree/master/core/src/main/scala/org/apache/spark/network

    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala

    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala

    package org.apache.spark.network
    
    import scala.reflect.ClassTag
    
    import org.apache.spark.network.buffer.ManagedBuffer
    import org.apache.spark.storage.{BlockId, StorageLevel}
    
    private[spark]
    trait BlockDataManager {
    
      /**
       * Interface to get local block data. Throws an exception if the block cannot be found or
       * cannot be read successfully.
       */
      def getBlockData(blockId: BlockId): ManagedBuffer
    
      /**
       * Put the block locally, using the given storage level.
       *
       * Returns true if the block was stored and false if the put operation failed or the block
       * already existed.
       */
      def putBlockData(
          blockId: BlockId,
          data: ManagedBuffer,
          level: StorageLevel,
          classTag: ClassTag[_]): Boolean
    
      /**
       * Release locks acquired by [[putBlockData()]] and [[getBlockData()]].
       */
      def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit
    }
    package org.apache.spark.network
    
    import java.io.Closeable
    import java.nio.ByteBuffer
    
    import scala.concurrent.{Future, Promise}
    import scala.concurrent.duration.Duration
    import scala.reflect.ClassTag
    
    import org.apache.spark.internal.Logging
    import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer}
    import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager}
    import org.apache.spark.storage.{BlockId, StorageLevel}
    import org.apache.spark.util.ThreadUtils
    
    private[spark]
    abstract class BlockTransferService extends ShuffleClient with Closeable with Logging {
    
      /**
       * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
       * local blocks or put local blocks.
       */
      def init(blockDataManager: BlockDataManager): Unit
    
      /**
       * Tear down the transfer service.
       */
      def close(): Unit
    
      /**
       * Port number the service is listening on, available only after [[init]] is invoked.
       */
      def port: Int
    
      /**
       * Host name the service is listening on, available only after [[init]] is invoked.
       */
      def hostName: String
    
      /**
       * Fetch a sequence of blocks from a remote node asynchronously,
       * available only after [[init]] is invoked.
       *
       * Note that this API takes a sequence so the implementation can batch requests, and does not
       * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as
       * the data of a block is fetched, rather than waiting for all blocks to be fetched.
       */
      override def fetchBlocks(
          host: String,
          port: Int,
          execId: String,
          blockIds: Array[String],
          listener: BlockFetchingListener,
          tempFileManager: TempFileManager): Unit
    
      /**
       * Upload a single block to a remote node, available only after [[init]] is invoked.
       */
      def uploadBlock(
          hostname: String,
          port: Int,
          execId: String,
          blockId: BlockId,
          blockData: ManagedBuffer,
          level: StorageLevel,
          classTag: ClassTag[_]): Future[Unit]
    
      /**
       * A special case of [[fetchBlocks]], as it fetches only one block and is blocking.
       *
       * It is also only available after [[init]] is invoked.
       */
      def fetchBlockSync(
          host: String,
          port: Int,
          execId: String,
          blockId: String,
          tempFileManager: TempFileManager): ManagedBuffer = {
        // A monitor for the thread to wait on.
        val result = Promise[ManagedBuffer]()
        fetchBlocks(host, port, execId, Array(blockId),
          new BlockFetchingListener {
            override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = {
              result.failure(exception)
            }
            override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = {
              data match {
                case f: FileSegmentManagedBuffer =>
                  result.success(f)
                case _ =>
                  val ret = ByteBuffer.allocate(data.size.toInt)
                  ret.put(data.nioByteBuffer())
                  ret.flip()
                  result.success(new NioManagedBuffer(ret))
              }
            }
          }, tempFileManager)
        ThreadUtils.awaitResult(result.future, Duration.Inf)
      }
    
      /**
       * Upload a single block to a remote node, available only after [[init]] is invoked.
       *
       * This method is similar to [[uploadBlock]], except this one blocks the thread
       * until the upload finishes.
       */
      def uploadBlockSync(
          hostname: String,
          port: Int,
          execId: String,
          blockId: BlockId,
          blockData: ManagedBuffer,
          level: StorageLevel,
          classTag: ClassTag[_]): Unit = {
        val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag)
        ThreadUtils.awaitResult(future, Duration.Inf)
      }
    }
  • 相关阅读:
    ASP.NET MVC 5 安全性和创建用户角色
    使用ENTITY FRAMEWORK 6以正确的方式管理DBCONTEXT:深入指南
    C#读取二进制格式的shapefile
    ASP.NET MVC 应用程序初学者常见问题汇总
    ASP.NET MVC 应用程序中使用CKEditor 4 的步骤
    html5学习笔记2
    html5学习笔记
    c#接口定义与应用
    Django ORM 优化心得
    携程的那点事
  • 原文地址:https://www.cnblogs.com/rsapaper/p/7867914.html
Copyright © 2011-2022 走看看