zoukankan      html  css  js  c++  java
  • Spark(五十三):Spark RPC初尝试使用

    基本用法主要掌握一点就行:

    master slave模式运用:driver 就是master,executor就是slave。

    如果executor要想和driver交互必须拿到driver的EndpointRef,通过driver的EndpointRef来调接口访问。

    driver启动时,会在driver中注册一个Endpoint服务,并暴露自己的ip和端口。executor端生成driver的EndpointRef,就主要需要两个参数就行:driver的host(ip)和port。

    导入Maven依赖

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.4.0</version>
            </dependency>

    定义RPC Server端的ip(localhost)。port(57992)、服务名称(hello-rpc-service)

    object HelloRpcSettings {
      val rpcName = "hello-rpc-service"
      val port = 57992
      val hostname="localhost"
    
      def getName() = {
        rpcName
      }
    
      def getPort(): Int = {
        port
      }
    
      def getHostname():String={
        hostname
      }
    }

    定义RPC的Endpoint类和发送数据类SayHi/SayBye

    case class SayHi(msg: String)
    
    case class SayBye(msg: String)
    
    import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEnv}
    
    class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
      override def onStart(): Unit = {
        println(rpcEnv.address)
        println("start hello endpoint")
      }
    
      override def receive: PartialFunction[Any, Unit] = {
        case SayHi(msg) =>
          println(s"receive $msg" )
      }
    
      override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        case SayHi(msg) => {
          println(s"receive $msg")
          context.reply(s"hi, $msg")
        }
        case SayBye(msg) => {
          println(s"receive $msg")
          context.reply(s"bye, $msg")
        }
      }
    
      override def onStop(): Unit = {
        println("stop hello endpoint")
      }
    }

    定义RPC 服务提供者

    import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    import org.apache.spark.rpc._
    import org.apache.spark.sql.SparkSession
    
    object RpcServerTest {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("test rpc").getOrCreate()
        val sparkContext: SparkContext = sparkSession.sparkContext
        val sparkEnv: SparkEnv = sparkContext.env
    
        val rpcEnv = RpcEnv.create(HelloRpcSettings.getName(), HelloRpcSettings.getHostname(), HelloRpcSettings.getHostname(), HelloRpcSettings.getPort(), conf,
          sparkEnv.securityManager, 1, false)
    
        val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
        rpcEnv.setupEndpoint(HelloRpcSettings.getName(), helloEndpoint)
    
        rpcEnv.awaitTermination()
      }
    }

    定义RPC服务使用者

    import org.apache.spark.{SecurityManager, SparkConf, SparkContext, SparkEnv}
    import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcEnvConfig}
    import org.apache.spark.sql.{Dataset, Row, SparkSession}
    
    import scala.concurrent.duration.Duration
    import scala.concurrent.{Await, Future}
    
    object RpcClientTest {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf()
        val sparkSession = SparkSession.builder().config(conf).master("local[*]").appName("test rpc").getOrCreate()
        val sparkContext: SparkContext = sparkSession.sparkContext
        val sparkEnv: SparkEnv = sparkContext.env
    
        val rpcEnv: RpcEnv = RpcEnv.create(HelloRpcSettings.getName(),HelloRpcSettings.getHostname(),HelloRpcSettings.getPort(),conf,sparkEnv.securityManager,false)
        val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress(HelloRpcSettings.getHostname(), HelloRpcSettings.getPort()), HelloRpcSettings.getName())
    
        import scala.concurrent.ExecutionContext.Implicits.global
    
        endPointRef.send(SayHi("test send"))
    
        val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
        future.onComplete {
          case scala.util.Success(value) => println(s"Got the result = $value")
          case scala.util.Failure(e) => println(s"Got error: $e")
        }
        Await.result(future, Duration.apply("30s"))
    
        val res = endPointRef.askSync[String](SayBye("test askSync"))
        println(res)
    
        sparkSession.stop()
      }
    
    }

    启动RPC 服务提供者

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/06/28 14:50:12 INFO SparkContext: Running Spark version 2.4.0
    19/06/28 14:50:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    19/06/28 14:50:12 INFO SparkContext: Submitted application: test rpc
    19/06/28 14:50:12 INFO SecurityManager: Changing view acls to: boco
    19/06/28 14:50:12 INFO SecurityManager: Changing modify acls to: boco
    19/06/28 14:50:12 INFO SecurityManager: Changing view acls groups to: 
    19/06/28 14:50:12 INFO SecurityManager: Changing modify acls groups to: 
    19/06/28 14:50:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(boco); groups with view permissions: Set(); users  with modify permissions: Set(boco); groups with modify permissions: Set()
    19/06/28 14:50:13 INFO Utils: Successfully started service 'sparkDriver' on port 64621.
    19/06/28 14:50:13 INFO SparkEnv: Registering MapOutputTracker
    19/06/28 14:50:13 INFO SparkEnv: Registering BlockManagerMaster
    19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    19/06/28 14:50:13 INFO DiskBlockManager: Created local directory at C:UsersocoAppDataLocalTemplockmgr-7128dde8-9c46-4580-bb72-c2161ba65bf7
    19/06/28 14:50:13 INFO MemoryStore: MemoryStore started with capacity 901.8 MB
    19/06/28 14:50:13 INFO SparkEnv: Registering OutputCommitCoordinator
    19/06/28 14:50:13 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    19/06/28 14:50:13 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-JL4FSCV:4040
    19/06/28 14:50:13 INFO Executor: Starting executor ID driver on host localhost
    19/06/28 14:50:13 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64642.
    19/06/28 14:50:13 INFO NettyBlockTransferService: Server created on DESKTOP-JL4FSCV:64642
    19/06/28 14:50:13 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    19/06/28 14:50:13 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
    19/06/28 14:50:13 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-JL4FSCV:64642 with 901.8 MB RAM, BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
    19/06/28 14:50:13 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
    19/06/28 14:50:13 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-JL4FSCV, 64642, None)
    19/06/28 14:50:13 INFO Utils: Successfully started service 'hello-rpc-service' on port 57992.
    localhost:57992
    start hello endpoint

    启动RPC 服务使用者

    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    19/06/28 14:53:53 INFO SparkContext: Running Spark version 2.4.0
    19/06/28 14:53:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    19/06/28 14:53:54 INFO SparkContext: Submitted application: test rpc
    19/06/28 14:53:54 INFO SecurityManager: Changing view acls to: boco
    19/06/28 14:53:54 INFO SecurityManager: Changing modify acls to: boco
    19/06/28 14:53:54 INFO SecurityManager: Changing view acls groups to: 
    19/06/28 14:53:54 INFO SecurityManager: Changing modify acls groups to: 
    19/06/28 14:53:54 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(boco); groups with view permissions: Set(); users  with modify permissions: Set(boco); groups with modify permissions: Set()
    19/06/28 14:53:55 INFO Utils: Successfully started service 'sparkDriver' on port 64818.
    19/06/28 14:53:55 INFO SparkEnv: Registering MapOutputTracker
    19/06/28 14:53:55 INFO SparkEnv: Registering BlockManagerMaster
    19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
    19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
    19/06/28 14:53:55 INFO DiskBlockManager: Created local directory at C:UsersocoAppDataLocalTemplockmgr-6a0b8e7f-86d2-4bb8-b45c-7c04deabcb91
    19/06/28 14:53:55 INFO MemoryStore: MemoryStore started with capacity 901.8 MB
    19/06/28 14:53:55 INFO SparkEnv: Registering OutputCommitCoordinator
    19/06/28 14:53:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
    19/06/28 14:53:55 INFO Utils: Successfully started service 'SparkUI' on port 4041.
    19/06/28 14:53:55 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://DESKTOP-JL4FSCV:4041
    19/06/28 14:53:55 INFO Executor: Starting executor ID driver on host localhost
    19/06/28 14:53:55 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 64840.
    19/06/28 14:53:55 INFO NettyBlockTransferService: Server created on DESKTOP-JL4FSCV:64840
    19/06/28 14:53:55 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
    19/06/28 14:53:55 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
    19/06/28 14:53:55 INFO BlockManagerMasterEndpoint: Registering block manager DESKTOP-JL4FSCV:64840 with 901.8 MB RAM, BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
    19/06/28 14:53:55 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
    19/06/28 14:53:55 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, DESKTOP-JL4FSCV, 64840, None)
    19/06/28 14:53:55 WARN Utils: Service 'hello-rpc-service' could not bind on port 57992. Attempting port 57993.
    19/06/28 14:53:55 INFO Utils: Successfully started service 'hello-rpc-service' on port 57993.
    19/06/28 14:53:55 INFO TransportClientFactory: Successfully created connection to localhost/127.0.0.1:57992 after 31 ms (0 ms spent in bootstraps)
    bye, test askSync
    Got the result = hi, neo
    19/06/28 14:53:55 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-JL4FSCV:4041
    19/06/28 14:53:55 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    19/06/28 14:53:55 INFO MemoryStore: MemoryStore cleared
    19/06/28 14:53:55 INFO BlockManager: BlockManager stopped
    19/06/28 14:53:55 INFO BlockManagerMaster: BlockManagerMaster stopped
    19/06/28 14:53:55 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    19/06/28 14:53:55 INFO SparkContext: Successfully stopped SparkContext
    19/06/28 14:53:55 INFO ShutdownHookManager: Shutdown hook called
    19/06/28 14:53:55 INFO ShutdownHookManager: Deleting directory 

    此时 RPC 服务提供者打印信息如下:

    receive test send
    receive neo
    receive test askSync
    19/06/28 14:53:56 WARN TransportChannelHandler: Exception in connection from /127.0.0.1:64865
    java.io.IOException: 远程主机强迫关闭了一个现有的连接。
        at sun.nio.ch.SocketDispatcher.read0(Native Method)
        at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
        at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
        at sun.nio.ch.IOUtil.read(IOUtil.java:192)
        at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
        at io.netty.buffer.PooledUnsafeDirectByteBuf.setBytes(PooledUnsafeDirectByteBuf.java:288)
        at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1106)
        at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:343)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
  • 相关阅读:
    linux下tomcat内存溢出
    leetcode
    HDU 4810 Wall Painting (位操作-异或)
    详解Java中的访问控制修饰符(public, protected, default, private)
    mpvue开发微信小程序之时间+日期选择器
    多行文本溢出隐藏
    swift 多态函数方式
    swift 多态函数方式
    swift 多态函数方式
    swift 多态函数方式
  • 原文地址:https://www.cnblogs.com/yy3b2007com/p/11104065.html
Copyright © 2011-2022 走看看