zoukankan      html  css  js  c++  java
  • scala学习之实现RPC通信

    最近学习scala,个人感觉非常灵活,实现rpc通信非常简单,函数式编程比较烧脑

    1.搭建工程 创建scala maven 工程  

    项目pom文件

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.xiaot</groupId>
      <artifactId>scala-demo</artifactId>
      <version>1.0-SNAPSHOT</version>
      <inceptionYear>2008</inceptionYear>
      <properties>
        <scala.version>2.10.6</scala.version>
      </properties>
    
      <repositories>
        <repository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </repository>
      </repositories>
    
      <pluginRepositories>
        <pluginRepository>
          <id>scala-tools.org</id>
          <name>Scala-Tools Maven2 Repository</name>
          <url>http://scala-tools.org/repo-releases</url>
        </pluginRepository>
      </pluginRepositories>
    
      <dependencies>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-library</artifactId>
          <version>${scala.version}</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.specs</groupId>
          <artifactId>specs</artifactId>
          <version>1.2.5</version>
          <scope>test</scope>
        </dependency>
        <dependency>
          <groupId>org.scala-lang</groupId>
          <artifactId>scala-actors</artifactId>
          <version>2.10.6</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-actor -->
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.10</artifactId>
          <version>2.3.14</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-remote_2.10</artifactId>
          <version>2.3.14</version>
        </dependency>
      </dependencies>
    
      <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <executions>
              <execution>
                <goals>
                  <goal>compile</goal>
                  <goal>testCompile</goal>
                </goals>
              </execution>
            </executions>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
              <args>
                <arg>-target:jvm-1.5</arg>
              </args>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-eclipse-plugin</artifactId>
            <configuration>
              <downloadSources>true</downloadSources>
              <buildcommands>
                <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
              </buildcommands>
              <additionalProjectnatures>
                <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
              </additionalProjectnatures>
              <classpathContainers>
                <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
              </classpathContainers>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <reporting>
        <plugins>
          <plugin>
            <groupId>org.scala-tools</groupId>
            <artifactId>maven-scala-plugin</artifactId>
            <configuration>
              <scalaVersion>${scala.version}</scalaVersion>
            </configuration>
          </plugin>
        </plugins>
      </reporting>
    </project>

    2. 创建Master.scala  作为主节点保持跟Worker进行通信  提供Worker注册信息存储和心跳检测

    package com.xiaot.master
    
    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import com.xiaot.worker.WorkerInfo
    import scala.concurrent.duration._
    import scala.collection.mutable
    
    
    /**
      * Created by xiaot on 2018/4/2.
      * scla 实现RPC
      */
    class Master(val host:String,val port :Int)  extends Actor{
    
      //存储worker注册信息
      val idToWork = new mutable.HashMap[String,WorkerInfo]()
      //用于心跳检测动态删除添加worker节点信息
      val workers = new mutable.HashSet[WorkerInfo]()
      //超时时间
      val CHECK_TIME_OUT = 15000
    
      override def preStart(): Unit = {
        println("master preStart ")
        //定时检查worker是否存活 实时删除不存活的worker
        import context.dispatcher
        context.system.scheduler.schedule(0 millis,CHECK_TIME_OUT millis,self,CheckTimeOutWorker)
    
      }
    
      override def receive: Receive = {
        case RegisterWorker(id,memory,cores) =>{
    
          if (!idToWork.contains(id)){
            val workerInfo = new WorkerInfo(id,memory,cores)
            idToWork(id) = workerInfo
            workers+=workerInfo
          }
         println("worker:"+id+" regist to master success")
          sender ! RegisteredWorker(s"akka.tcp://MasterSystem@$host:$port/user/master")
        }
        case HeartBeat(id) =>{
          if (idToWork.contains(id)){
            //获取对应的worker
            val workerInfo = idToWork(id)
            val currentTime = System.currentTimeMillis()
            //心跳检测更新时间
            workerInfo.lastHeartbeatTime = currentTime
          }
        }
        case CheckTimeOutWorker =>{
          val currentTime = System.currentTimeMillis()
          val preMovesWorkers = workers.filter(x => currentTime-x.lastHeartbeatTime>CHECK_TIME_OUT)
    
          for (w<-preMovesWorkers){
            idToWork -= w.id
            workers -=w
          }
          println(workers.size)
        }
      }
    }
    
    object Master{
      def main(args: Array[String]): Unit = {
    
        val host = args(0)
        val port = args(1).toInt
    
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
    
        val config = ConfigFactory.parseString(configStr)
        val actorSystem = ActorSystem("MasterSystem",config)
      // 创建actor
        val master = actorSystem.actorOf(Props(new Master(host,port)),"master")
    
        actorSystem.awaitTermination()
      }
    
    
    
    }

    Master运行参数:

    3.创建Worker.scala  作为工作节点 定期向master心跳检测和节点注册信息

    package com.xiaot.worker
    
    import java.util.UUID
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    import scala.concurrent.duration._
    import com.xiaot.master.{HeartBeat, RegisterWorker, RegisteredWorker, SendHeartBeat}
    
    /**
      * Created by xiaot on 2018/4/3.
      */
    class Worker(val masterHost:String,val port:Int,val memory:Int,val cores:Int) extends Actor{
    
      var  master :ActorSelection = _
      val workerId = UUID.randomUUID().toString
      val  HEART_INTERAL=10000
    
      override def preStart(): Unit = {
        //与master创建连接
        master = context.actorSelection(s"akka.tcp://MasterSystem@$masterHost:$port/user/master")
        //向master注册
        master ! RegisterWorker(workerId,memory,cores)
      }
    
      override def receive: Receive = {
    
        case RegisteredWorker(masterUrl) =>{
    
          println("worker:"+workerId+" 已经成功注册到"+masterUrl)
          //心跳检测
          import context.dispatcher
          //自己调用自己通过case类进行实际心跳检测实现
          context.system.scheduler.schedule(0 millis,HEART_INTERAL millis,self,SendHeartBeat)
        }
        case SendHeartBeat=>{
          println("send heartbeat to master")
          master ! HeartBeat(workerId)
        }
      }
    
    
    }
    object Worker{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        val memory = args(4).toInt
        val cores = args(5).toInt
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
    
        var config = ConfigFactory.parseString(configStr)
        var  worder = ActorSystem("WorderSystem",config)
        worder.actorOf(Props(new Worker(masterHost,masterPort,memory,cores)))
        worder.awaitTermination()
      }
    }

    Worker运行参数:

    WorkerInfo.scala 用于保存工作节点的信息

    package com.xiaot.worker
    
    /**
      * Created by xiaot on 2018/4/4.
      */
    class WorkerInfo(val id:String,val memory:Int,val cores:Int){
    
      var lastHeartbeatTime :Long = _
    }

     RemoteMessage.scala 用于远程信息发送,其中包括心跳检测等样例类

    package com.xiaot.master
    
    /**
      * Created by xiaot on 2018/4/4.
      */
    trait RemoteMessage extends Serializable
    //worker -->master
    case class RegisterWorker(id:String,memory:Int,cores:Int) extends RemoteMessage
    //master--->worker
    case class RegisteredWorker(masterUrl:String) extends RemoteMessage
    //worker---master
    case  class HeartBeat(id:String) extends RemoteMessage
    //worker -->self
    case object SendHeartBeat
    //master -->self
    case object CheckTimeOutWorker

    4. 执行结果

  • 相关阅读:
    bzoj3531[Sdoi2014]旅行
    bzoj3212 Pku3468 A Simple Problem with Integers 线段树
    bzoj1858[Scoi2010]序列操作 线段树
    bzoj2243[SDOI2011]染色 树链剖分+线段树
    bzoj3038上帝造题的七分钟2
    bzoj1036[ZJOI2008]树的统计Count 树链剖分+线段树
    bzoj3211花神游历各国 线段树
    bzoj4596[Shoi2016]黑暗前的幻想乡 Matrix定理+容斥原理
    bzoj3129[Sdoi2013]方程 exlucas+容斥原理
    刷题总结——寻宝游戏(bzoj3991 dfs序)
  • 原文地址:https://www.cnblogs.com/shoutn/p/8717750.html
Copyright © 2011-2022 走看看