zoukankan      html  css  js  c++  java
  • spark下测试akka的分布式通讯功能

    采用的spark版本为1.1.0

    scala版本为2.10.4

    • 编写scala类文件myactors.scala:

      package bluejoe
      
      import akka.actor._
      import com.typesafe.config.ConfigFactory
      import akka.remote.RemoteScope
      
      class LocalActor extends akka.actor.Actor {
      
        //Get a reference to the remote actor
        val remoteActor = context.actorFor("akka://RemoteNodeApp@10.0.224.170:2552/user/remoteActor")
        def receive: Receive = {
          case message: String =>
           println("<<<<<<<<<<<<<"+message)  
      }
      }
      
      class RemoteActor extends akka.actor.Actor {
        def receive: Receive = {
          case message: String =>
            // Get reference to the message sender and reply back
            println(">>>>>>>>>>>>"+message)
        }
      }


    • 编译生成class文件:

      scalac -classpath /usr/local/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar myactors.scala

    • 启动spark-shell,注意指定classpath:

      spark-shell --driver-class-path /root


    • 准备2个配置文件,remote.conf内容如下:

      RemoteSys {
      	akka {
      		actor {
      	    		provider = "akka.remote.RemoteActorRefProvider"
      	  	}
      	   remote {
      	    transport = "akka.remote.netty.NettyRemoteTransport"
      	    netty {
      	      hostname = "10.0.224.170"
      	      port = 2552
      	    }
      	  }
      	}
      }


    • client.conf的内容如下:

      LocalSys {
              akka {
                      actor {
                      provider = "akka.remote.RemoteActorRefProvider"
                      }
              }
      }


    • 在spark-shell中输入server端代码:

      import bluejoe._
      import akka.actor._
      import com.typesafe.config.ConfigFactory
      
      
      val system = ActorSystem("RemoteNodeApp", ConfigFactory.parseFile(new java.io.File("/root/remote.conf")).getConfig("RemoteSys"))
      val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")
      


    • 启动另外一个spark-shell,输入client端代码:

      import bluejoe._
      import akka.actor._
      import com.typesafe.config.ConfigFactory
      
      // load the configuration
      val config = ConfigFactory.parseFile(new java.io.File("/root/client.conf")).getConfig("LocalSys")
      val system = ActorSystem("LocalNodeApp", config)
      val clientActor = system.actorOf(Props[LocalActor])
      clientActor ! "Hello"
      Thread.sleep(4000)
      system.shutdown()


  • 相关阅读:
    怎样启用或关闭Windows的Telnet功能
    打开SQL Server 配置管理器时出现了问题 ,无法连接到WMI提供程序,您没有权限或者该服务器无法访问
    服务器迁移注意什么?
    测试技能进阶图谱
    Mac
    Maven
    .net发送邮箱
    MongoDB 未添加索引 当数据量较大时 分页查询报错问题解决
    代码保存
    这技术网站不错
  • 原文地址:https://www.cnblogs.com/bluejoe/p/5115840.html
Copyright © 2011-2022 走看看