https://github.com/apache/spark/tree/master/core/src/main/scala/org/apache/spark/network
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/network/BlockDataManager.scala
package org.apache.spark.network import scala.reflect.ClassTag import org.apache.spark.network.buffer.ManagedBuffer import org.apache.spark.storage.{BlockId, StorageLevel} private[spark] trait BlockDataManager { /** * Interface to get local block data. Throws an exception if the block cannot be found or * cannot be read successfully. */ def getBlockData(blockId: BlockId): ManagedBuffer /** * Put the block locally, using the given storage level. * * Returns true if the block was stored and false if the put operation failed or the block * already existed. */ def putBlockData( blockId: BlockId, data: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Boolean /** * Release locks acquired by [[putBlockData()]] and [[getBlockData()]]. */ def releaseLock(blockId: BlockId, taskAttemptId: Option[Long]): Unit }
package org.apache.spark.network import java.io.Closeable import java.nio.ByteBuffer import scala.concurrent.{Future, Promise} import scala.concurrent.duration.Duration import scala.reflect.ClassTag import org.apache.spark.internal.Logging import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.shuffle.{BlockFetchingListener, ShuffleClient, TempFileManager} import org.apache.spark.storage.{BlockId, StorageLevel} import org.apache.spark.util.ThreadUtils private[spark] abstract class BlockTransferService extends ShuffleClient with Closeable with Logging { /** * Initialize the transfer service by giving it the BlockDataManager that can be used to fetch * local blocks or put local blocks. */ def init(blockDataManager: BlockDataManager): Unit /** * Tear down the transfer service. */ def close(): Unit /** * Port number the service is listening on, available only after [[init]] is invoked. */ def port: Int /** * Host name the service is listening on, available only after [[init]] is invoked. */ def hostName: String /** * Fetch a sequence of blocks from a remote node asynchronously, * available only after [[init]] is invoked. * * Note that this API takes a sequence so the implementation can batch requests, and does not * return a future so the underlying implementation can invoke onBlockFetchSuccess as soon as * the data of a block is fetched, rather than waiting for all blocks to be fetched. */ override def fetchBlocks( host: String, port: Int, execId: String, blockIds: Array[String], listener: BlockFetchingListener, tempFileManager: TempFileManager): Unit /** * Upload a single block to a remote node, available only after [[init]] is invoked. */ def uploadBlock( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Future[Unit] /** * A special case of [[fetchBlocks]], as it fetches only one block and is blocking. * * It is also only available after [[init]] is invoked. */ def fetchBlockSync( host: String, port: Int, execId: String, blockId: String, tempFileManager: TempFileManager): ManagedBuffer = { // A monitor for the thread to wait on. val result = Promise[ManagedBuffer]() fetchBlocks(host, port, execId, Array(blockId), new BlockFetchingListener { override def onBlockFetchFailure(blockId: String, exception: Throwable): Unit = { result.failure(exception) } override def onBlockFetchSuccess(blockId: String, data: ManagedBuffer): Unit = { data match { case f: FileSegmentManagedBuffer => result.success(f) case _ => val ret = ByteBuffer.allocate(data.size.toInt) ret.put(data.nioByteBuffer()) ret.flip() result.success(new NioManagedBuffer(ret)) } } }, tempFileManager) ThreadUtils.awaitResult(result.future, Duration.Inf) } /** * Upload a single block to a remote node, available only after [[init]] is invoked. * * This method is similar to [[uploadBlock]], except this one blocks the thread * until the upload finishes. */ def uploadBlockSync( hostname: String, port: Int, execId: String, blockId: BlockId, blockData: ManagedBuffer, level: StorageLevel, classTag: ClassTag[_]): Unit = { val future = uploadBlock(hostname, port, execId, blockId, blockData, level, classTag) ThreadUtils.awaitResult(future, Duration.Inf) } }