采用的spark版本为1.1.0
scala版本为2.10.4
- 编写scala类文件myactors.scala:
package bluejoe import akka.actor._ import com.typesafe.config.ConfigFactory import akka.remote.RemoteScope class LocalActor extends akka.actor.Actor { //Get a reference to the remote actor val remoteActor = context.actorFor("akka://RemoteNodeApp@10.0.224.170:2552/user/remoteActor") def receive: Receive = { case message: String => println("<<<<<<<<<<<<<"+message) } } class RemoteActor extends akka.actor.Actor { def receive: Receive = { case message: String => // Get reference to the message sender and reply back println(">>>>>>>>>>>>"+message) } }
- 编译生成class文件:
scalac -classpath /usr/local/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar myactors.scala
- 启动spark-shell,注意指定classpath:
spark-shell --driver-class-path /root
- 准备2个配置文件,remote.conf内容如下:
RemoteSys { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { transport = "akka.remote.netty.NettyRemoteTransport" netty { hostname = "10.0.224.170" port = 2552 } } } }
- client.conf的内容如下:
LocalSys { akka { actor { provider = "akka.remote.RemoteActorRefProvider" } } }
- 在spark-shell中输入server端代码:
import bluejoe._ import akka.actor._ import com.typesafe.config.ConfigFactory val system = ActorSystem("RemoteNodeApp", ConfigFactory.parseFile(new java.io.File("/root/remote.conf")).getConfig("RemoteSys")) val remoteActor = system.actorOf(Props[RemoteActor], name = "remoteActor")
- 启动另外一个spark-shell,输入client端代码:
import bluejoe._ import akka.actor._ import com.typesafe.config.ConfigFactory // load the configuration val config = ConfigFactory.parseFile(new java.io.File("/root/client.conf")).getConfig("LocalSys") val system = ActorSystem("LocalNodeApp", config) val clientActor = system.actorOf(Props[LocalActor]) clientActor ! "Hello" Thread.sleep(4000) system.shutdown()