zoukankan      html  css  js  c++  java
  • scala快速入门04

    Scala从入门开始04

    1.分布式通信框架Akka

    • 对底层异步IO(NIO)封装,使用起来方便。Java中有Netty,Mina。Scala有Akka。

    1.1什么是ACTORS

    • Actor是用来收发消息的,一个Actor就是一个实例,可以创建多个Actor实现并发。所有的Actor都有管理者,它用于创建Actor和管理Actor。以后根据发送带类型的消息进行匹配,处理不同逻辑。用到Scala实现用到技术点:case class/case object。 Akka Actors遵循Actor模型,Actors并发编程不需要锁,而是通过通信的机制实现并发。

    1.2什么是Akka

    • Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。

      Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递

    1.3Akka的特点

    1. 单并发性  和  分布式
    2. 可恢复的,弹性的
    3. 高效的
    4. 弹性,去中心化(分散)
    5. 可扩展
    

    1.4 Akka快速入门

    • 负责管理的角色:ActorSystem

    • 负责通信:Actor

    • 发送的消息:case class 和 case object

    • 发消息的方式: 异步、同步

    • 示例演示步骤:

      1. 创建一个Maven工程

        • idea中New --> project--> Maven --> Next --> 填写项目名
      2. 当前创建项目为Java项目,需要导入scala插件

        <build>
                <sourceDirectory>src/main/scala</sourceDirectory>
                <testSourceDirectory>src/test/scala</testSourceDirectory>
                <plugins>
                    <!-- 编译scala -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                    <goal>testCompile</goal>
                                </goals>
                                <configuration>
                                    <args>
        <!--                                <arg>-make:transitive</arg>-->
                                        <arg>-dependencyfile</arg>
                                        <arg>${project.build.directory}/.scala_dependencies</arg>
                                    </args>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
        
                    <!-- 打包插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.4.3</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <filters>
                                        <filter>
                                            <artifact>*:*</artifact>
                                            <excludes>
                                                <exclude>META-INF/*.SF</exclude>
                                                <exclude>META-INF/*.DSA</exclude>
                                                <exclude>META-INF/*.RSA</exclude>
                                            </excludes>
                                        </filter>
                                    </filters>
                                    <transformers>
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                            <resource>reference.conf</resource>
                                        </transformer>
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                            <mainClass>cn._51doit.rpc.Master</mainClass>
                                        </transformer>
                                    </transformers>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        
      3. 根据导入scala插件创建目录:main下创建scala文件夹,右键Make Directory as ---> Sources root。test下创建scala文件夹。右键Make Directory as ---> Sources test root。

      4. 右键项目->Add Frameworks Support->选择scala

      5. 引入scala依赖

        <!-- 导入scala的依赖 -->
            <properties>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
                <encoding>UTF-8</encoding>
                <scala.version>2.11.12</scala.version>
                <scala.compat.version>2.11</scala.compat.version>
                <akka.version>2.4.17</akka.version>
            </properties>
        
        
            <dependencies>
                <!-- scala的依赖 -->
                <dependency>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                    <version>${scala.version}</version>
                </dependency>
        
                <!-- akka actor依赖 -->
                <dependency>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-actor_2.11</artifactId>
                    <version>${akka.version}</version>
                </dependency>
        
                <!-- akka远程通信依赖 -->
                <dependency>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-remote_2.11</artifactId>
                    <version>${akka.version}</version>
                </dependency>
        
          </dependencies>
        
      6. 联网等待maven下载依赖。

      7. maven安装步骤:https://www.cnblogs.com/sunleejon/p/12391093.html

      8. idea强制清除Maven缓存:https://www.cnblogs.com/-beyond/p/11557196.html

    1.5Akka RPC通信过程:

    1.启动Master,内部启动一个定时器,定期检测超时Worker
    2.启动所有Worker,Worker首先向Master建立连接,然后向Master注册,Worker把自身的信息发送给Master(ip,端口,资源)。
    3.Master接收到Worker发送的注册信息,然后将Worker的信息保存起来,然后向Worker反馈信息,告诉Worker注册成功了。
    4.Worker接收到了Master反馈注册成功信息,然后定期向Master发送心跳。
    5.Master接收Worker的心跳信息,然后定期更新对应Worker的上一次心跳时间。
    

    1.6 Akka创建Actor

    • Master.scala
    package com._xjk.rpc
    
    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    class Master extends Actor {
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case "hi" => {
          println("hi")
        }
        case "hello" => {
          println("hello")
        }
      }
    }
    
    
    //
    object Master {
      def main(args: Array[String]): Unit = {
        // 报错:Error:scalac:bad option: "-make:transitive"
        // 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
        val host = args(0)
        val port = args(1).toInt
        // 字符串传入配置信息
        val confStr =
          s"""
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname="$host"
            |akka.remote.netty.tcp.port="$port"
          """.stripMargin
        val conf = ConfigFactory.parseString(confStr)
        // 创建一个 ActorSystem(创建管理Actor,单例)
        val actorSystem = ActorSystem("Master-Actor-System", conf)
        // 通过ActorSystem 创建Actor
        val master = actorSystem.actorOf(Props[Master], "Master-Acotory")
        // master给自己发送消息
        // ! 表示异步发送
        master ! "hello"
      }
    }
    
    • 配置Master.scala 的Program arguments,运行即可。
    localhost 8888
    

    1.7 Akka 中Master和Worker建立连接

    • Akka 中Master和Worker建立连接 并发送一条消息
    package com._xjk.rpc
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    class Worker extends Actor{
      var masterRef: ActorSelection = _
      // 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
      override def preStart(): Unit = {
        // 跟Master 建立连接, 拿到Master代理对象
        // 此处指定Master地址是akka的通信协议,是长链接,书写方式: 地址/user/Master名字
        masterRef = context.actorSelection("akka.tcp://Master-Actor-System@localhost:8888/user/Master-Actor")
        // 代理对象发送消息
        masterRef ! "hello"
      }
      // masterRef 为 代理对象,并不是 真正Master的引用,因为Master和Worker不在同一个进程中,Worker实际拿到是Master代理对象。
      override def receive: Receive = {
        case "hello" => {
          println("recv hello!")
        }
      }
    }
    
    
    object Worker {
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        // 字符串传入配置信息
        val confStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname="$host"
             |akka.remote.netty.tcp.port="$port"
          """.stripMargin
          val config: Config = ConfigFactory.parseString(confStr)
          // 创建ActorSystem
          val actorSystem = ActorSystem.apply("Worker-Actor-System", config)
          // 创建Actor, Props指定类型
          val worker = actorSystem.actorOf(Props[Worker], "Worker-Actor")
    //      worker ! "hello"
        }
    }
    
    • Worker将注册信息发送Master,定义case class里面放置信息。
    // 注册worker信息
    case class RegisterWorker(workerId:String, memory:Int, cores:Int)
    
    
    
    // Worker发送:
    val WORKER_ID = UUID.randomUUID().toString
    // 向Master发送注册消息: WorkerId,内存,核数等
    masterRef ! RegisterWorker(WORKER_ID, 10240, 24)
    
    // Master接收消息并打印:
    class Master extends Actor {
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case RegisterWorker(workerId, memory, cores) => {
          println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
        }
      }
    }
    
    
    • Master保存注册信息,并返回注册成功消息
    // Master.scala
    class Master extends Actor {
      val id2Worker = new mutable.HashMap[String, WorkerInfo]()
      val workers = new mutable.HashSet[WorkerInfo]()
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case RegisterWorker(workerId, memory, cores) => {
          println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
          val workerInfo = new WorkerInfo(workerId,memory,cores)
          // 保存Worker信息到Map
    //      id2Worker += ((workerId, workerInfo))
    //      id2Worker += (workerId -> workerInfo)
          id2Worker(workerId) = workerInfo
          // 保存Worker信息到Set中
          workers += workerInfo
          // 向Worker反馈成功的消息,获取消息发送者的引用(代理)
          // 消息接收者,可以通过sender方法获取消息发送者的连接
          sender() ! RegisterdWorker
        }
      }
    }
    
    // Worker.scala
    case RegisterdWorker => {
          println("worker 接收到注册成功消息!")
        }
    // WorkerInfo.scala
    package com._xjk.rpc
    
    class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
    
    }
    // Messages
    package com._xjk.rpc
    // 注册worker信息
    case class RegisterWorker(workerId:String, memory:Int, cores:Int)
    // Master向 Worker发送注册成功消息
    case object RegisterdWorker
    
    • Worker定期向Master发送心跳。

    2.完整Akka练习

    • Master和Worker建立连接,Worker向Master发送注册消息,Master保存注册信息并返回注册成功消息,Worker定期向Master发送心跳,Master定期检测超时Worker并移除。
    • Master.scala
    package com._xjk.rpc
    
    import java.util.Date
    
    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._
    
    import scala.collection.mutable
    
    class Master extends Actor {
      val id2Worker = new mutable.HashMap[String, WorkerInfo]()
      val workers = new mutable.HashSet[WorkerInfo]()
      // 检测时间
      val CHECK_INTERVAL = 15000
      // 启动定时器,定期检测超时Worker
      override def preStart(): Unit = {
        // 导入隐式转换
        import context.dispatcher
        // 启动定时器
        context.system.scheduler.schedule(
          0 millis,
          CHECK_INTERVAL millis,
          self,
          CheckTimeoutWorker
        )
      }
    
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case RegisterWorker(workerId, memory, cores) => {
          println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
          val workerInfo = new WorkerInfo(workerId,memory,cores)
          // 保存Worker信息到Map
    //      id2Worker += ((workerId, workerInfo))
    //      id2Worker += (workerId -> workerInfo)
          id2Worker(workerId) = workerInfo
          // 保存Worker信息到Set中
          workers += workerInfo
          // 向Worker反馈成功的消息,获取消息发送者的引用(代理)
          // 消息接收者,可以通过sender方法获取消息发送者的连接
          sender() ! RegisterdWorker
        }
        // Worker发送给Master的心跳消息
        case Heartbeat(workerId) => {
          println(new Date() + "," + workerId)
          // 根据workerId取出对应worker的Info
          val info = id2Worker(workerId)
          // 获取当前时间:
          val currentTime = System.currentTimeMillis()
          // 更新workerInfo 的心跳时间
          info.lastHeartbeatTime = currentTime
    
        }
        // 发给自己消息,用于检查Worker超时消息
        case CheckTimeoutWorker => {
          // 获取当前时间:
          var currentTime = System.currentTimeMillis()
          // 过滤超时workers
          val deadWorkers = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)
          // 删除出现问题的worker
    //      for (workerInfo <- deadWorkers) {
    //        val workerId= workerInfo.workerId
    //        // 从map中删除
    //        id2Worker -= workerId
    //        // 从set中删除
    //        workers -= workerInfo
    //      }
          deadWorkers.foreach(workerInfo => {
            val workerId= workerInfo.workerId
            // 从map中删除
            id2Worker -= workerId
            // 从set中删除
            workers -= workerInfo
            println(s"当前worker的个数为:${id2Worker.size}")
          })
        }
      }
    }
    
    
    //
    object Master {
    
      val MASTER_ACTOR_SYSTEM = "Master-Actor-System"
      val MASTER_ACTOR_NAME = "Master-Actor"
    
      def main(args: Array[String]): Unit = {
        // 报错:Error:scalac:bad option: "-make:transitive"
        // 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
        val host = args(0)
        val port = args(1).toInt
        // 字符串传入配置信息
        val confStr =
          s"""
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname="$host"
            |akka.remote.netty.tcp.port="$port"
          """.stripMargin
        val conf = ConfigFactory.parseString(confStr)
        // 创建一个 ActorSystem(创建管理Actor,单例)
        val actorSystem = ActorSystem(MASTER_ACTOR_SYSTEM, conf)
        // 通过ActorSystem 创建Actor
        val master = actorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME)
        // master给自己发送消息
        // ! 表示异步发送
    //    master ! "hello"
      }
    }
    
    • Messages
    package com._xjk.rpc
    
    
    // 注册worker信息
    case class RegisterWorker(workerId:String, memory:Int, cores:Int)
    
    // Master向 Worker发送注册成功消息
    case object RegisterdWorker
    
    // Worker发给Master心跳消息
    case class Heartbeat(workerId:String)
    
    // Worker发送给自己的消息
    case object  SendHeartbeat
    
    
    // Master发送给自己检测Worker超时消息
    case object CheckTimeoutWorker
    
    • Worker.scala
    package com._xjk.rpc
    
    import java.util.UUID
    // 导入时间单位
    import scala.concurrent.duration._
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    class Worker (var masterHost:String, var masterPort:Int, var memory:Int, var cores:Int) extends Actor{
      var masterRef: ActorSelection = _
      // worker 的 ID
      val WORKER_ID = UUID.randomUUID().toString
      var HEARTBEAT_INTERVAL = 10000
      // 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
      override def preStart(): Unit = {
    
        // 跟Master 建立连接, 拿到Master代理对象
        masterRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}")
        // 代理对象发送消息
    //    masterRef ! "hello"
        // 向Master发送注册消息: WorkerId,内存,核数等
        masterRef ! RegisterWorker(WORKER_ID, memory, cores)
      }
    
      override def receive: Receive = {
        case "hello" => {
          println("recv hello!")
        }
        // Master返回给Worker注册成功的消息
        case RegisterdWorker => {
          println("worker 接收到注册成功消息!")
          // 导入隐式转换
          import context.dispatcher
          // 上下文中定义延时0秒,每10秒给自己发送消息。self 表示发送给自己。
          context.system.scheduler.schedule(
            0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat
          )
        }
          // 接收自己发送消息
        case SendHeartbeat => {
          // 判断是否断开连接
          // 判断是否Master发生变化
          // 将心跳消息发送给master
          masterRef ! Heartbeat(WORKER_ID)
        }
      }
    }
    
    
    object Worker {
      val WORKER_ACTOR_SYSTEM = "Worker-Actor-System"
      val WORKER_ACTOR_NAME = "Worker-Actor"
      def main(args: Array[String]): Unit = {
        val masterHost = args(0)
        val masterHort = args(1).toInt
    
        val workerHost = args(2)
        val workerPort = args(3).toInt
    
        val workerMemory = args(4).toInt
        val workerCores = args(5).toInt
    
    
    
        // 字符串传入配置信息
        val confStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname="$workerHost"
             |akka.remote.netty.tcp.port="$workerPort"
          """.stripMargin
          val config: Config = ConfigFactory.parseString(confStr)
          // 创建ActorSystem
          val actorSystem = ActorSystem.apply(WORKER_ACTOR_SYSTEM, config)
          // 创建Actor, Props指定类型
          val worker = actorSystem.actorOf(Props(new Worker(masterHost, masterHort, workerMemory, workerCores)), WORKER_ACTOR_NAME)
    //      worker ! "hello"
        }
    }
    
    • WorkerInfo.scala
    package com._xjk.rpc
    
    class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
      var lastHeartbeatTime: Long = _
    }
    
    
    • 通过maven打jar包.Lifecycle->package.(注意路径有中文会有错误)

    • 运行Master

    java -jar .AkkaRPC-Master-1.0.jar 127.0.0.1 8888
    
    • 运行Worker
    java -jar .AkkaRPC-Worker-1.0.jar 127.0.0.1 8888 127.0.0.1 9999 1024 2
    
  • 相关阅读:
    LeetCode 24. Swap Nodes in Pairs (两两交换链表中的节点)
    LeetCode 1041. Robot Bounded In Circle (困于环中的机器人)
    LeetCode 1037. Valid Boomerang (有效的回旋镖)
    LeetCode 1108. Defanging an IP Address (IP 地址无效化)
    LeetCode 704. Binary Search (二分查找)
    LeetCode 744. Find Smallest Letter Greater Than Target (寻找比目标字母大的最小字母)
    LeetCode 852. Peak Index in a Mountain Array (山脉数组的峰顶索引)
    LeetCode 817. Linked List Components (链表组件)
    LeetCode 1019. Next Greater Node In Linked List (链表中的下一个更大节点)
    29. Divide Two Integers
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14413186.html
Copyright © 2011-2022 走看看