zoukankan      html  css  js  c++  java
  • scala快速入门04

    Scala从入门开始04

    1.分布式通信框架Akka

    • 对底层异步IO(NIO)封装,使用起来方便。Java中有Netty,Mina。Scala有Akka。

    1.1什么是ACTORS

    • Actor是用来收发消息的,一个Actor就是一个实例,可以创建多个Actor实现并发。所有的Actor都有管理者,它用于创建Actor和管理Actor。以后根据发送带类型的消息进行匹配,处理不同逻辑。用到Scala实现用到技术点:case class/case object。 Akka Actors遵循Actor模型,Actors并发编程不需要锁,而是通过通信的机制实现并发。

    1.2什么是Akka

    • Akka是JAVA虚拟机JVM平台上构建高并发、分布式和容错应用的工具包。Akka用Scala语言写成,同时提供了Scala和JAVA的开发接口。

      Akka处理并发的方法基于Actor模型。在Akka里,Actor之间通信的唯一机制就是消息传递

    1.3Akka的特点

    1. 单并发性  和  分布式
    2. 可恢复的,弹性的
    3. 高效的
    4. 弹性,去中心化(分散)
    5. 可扩展
    

    1.4 Akka快速入门

    • 负责管理的角色:ActorSystem

    • 负责通信:Actor

    • 发送的消息:case class 和 case object

    • 发消息的方式: 异步、同步

    • 示例演示步骤:

      1. 创建一个Maven工程

        • idea中New --> project--> Maven --> Next --> 填写项目名
      2. 当前创建项目为Java项目,需要导入scala插件

        <build>
                <sourceDirectory>src/main/scala</sourceDirectory>
                <testSourceDirectory>src/test/scala</testSourceDirectory>
                <plugins>
                    <!-- 编译scala -->
                    <plugin>
                        <groupId>net.alchim31.maven</groupId>
                        <artifactId>scala-maven-plugin</artifactId>
                        <version>3.2.2</version>
                        <executions>
                            <execution>
                                <goals>
                                    <goal>compile</goal>
                                    <goal>testCompile</goal>
                                </goals>
                                <configuration>
                                    <args>
        <!--                                <arg>-make:transitive</arg>-->
                                        <arg>-dependencyfile</arg>
                                        <arg>${project.build.directory}/.scala_dependencies</arg>
                                    </args>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
        
                    <!-- 打包插件 -->
                    <plugin>
                        <groupId>org.apache.maven.plugins</groupId>
                        <artifactId>maven-shade-plugin</artifactId>
                        <version>2.4.3</version>
                        <executions>
                            <execution>
                                <phase>package</phase>
                                <goals>
                                    <goal>shade</goal>
                                </goals>
                                <configuration>
                                    <filters>
                                        <filter>
                                            <artifact>*:*</artifact>
                                            <excludes>
                                                <exclude>META-INF/*.SF</exclude>
                                                <exclude>META-INF/*.DSA</exclude>
                                                <exclude>META-INF/*.RSA</exclude>
                                            </excludes>
                                        </filter>
                                    </filters>
                                    <transformers>
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                            <resource>reference.conf</resource>
                                        </transformer>
                                        <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                            <mainClass>cn._51doit.rpc.Master</mainClass>
                                        </transformer>
                                    </transformers>
                                </configuration>
                            </execution>
                        </executions>
                    </plugin>
                </plugins>
            </build>
        
      3. 根据导入scala插件创建目录:main下创建scala文件夹,右键Make Directory as ---> Sources root。test下创建scala文件夹。右键Make Directory as ---> Sources test root。

      4. 右键项目->Add Frameworks Support->选择scala

      5. 引入scala依赖

        <!-- 导入scala的依赖 -->
            <properties>
                <maven.compiler.source>1.8</maven.compiler.source>
                <maven.compiler.target>1.8</maven.compiler.target>
                <encoding>UTF-8</encoding>
                <scala.version>2.11.12</scala.version>
                <scala.compat.version>2.11</scala.compat.version>
                <akka.version>2.4.17</akka.version>
            </properties>
        
        
            <dependencies>
                <!-- scala的依赖 -->
                <dependency>
                    <groupId>org.scala-lang</groupId>
                    <artifactId>scala-library</artifactId>
                    <version>${scala.version}</version>
                </dependency>
        
                <!-- akka actor依赖 -->
                <dependency>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-actor_2.11</artifactId>
                    <version>${akka.version}</version>
                </dependency>
        
                <!-- akka远程通信依赖 -->
                <dependency>
                    <groupId>com.typesafe.akka</groupId>
                    <artifactId>akka-remote_2.11</artifactId>
                    <version>${akka.version}</version>
                </dependency>
        
          </dependencies>
        
      6. 联网等待maven下载依赖。

      7. maven安装步骤:https://www.cnblogs.com/sunleejon/p/12391093.html

      8. idea强制清除Maven缓存:https://www.cnblogs.com/-beyond/p/11557196.html

    1.5Akka RPC通信过程:

    1.启动Master,内部启动一个定时器,定期检测超时Worker
    2.启动所有Worker,Worker首先向Master建立连接,然后向Master注册,Worker把自身的信息发送给Master(ip,端口,资源)。
    3.Master接收到Worker发送的注册信息,然后将Worker的信息保存起来,然后向Worker反馈信息,告诉Worker注册成功了。
    4.Worker接收到了Master反馈注册成功信息,然后定期向Master发送心跳。
    5.Master接收Worker的心跳信息,然后定期更新对应Worker的上一次心跳时间。
    

    1.6 Akka创建Actor

    • Master.scala
    package com._xjk.rpc
    
    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    class Master extends Actor {
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case "hi" => {
          println("hi")
        }
        case "hello" => {
          println("hello")
        }
      }
    }
    
    
    //
    object Master {
      def main(args: Array[String]): Unit = {
        // 报错:Error:scalac:bad option: "-make:transitive"
        // 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
        val host = args(0)
        val port = args(1).toInt
        // 字符串传入配置信息
        val confStr =
          s"""
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname="$host"
            |akka.remote.netty.tcp.port="$port"
          """.stripMargin
        val conf = ConfigFactory.parseString(confStr)
        // 创建一个 ActorSystem(创建管理Actor,单例)
        val actorSystem = ActorSystem("Master-Actor-System", conf)
        // 通过ActorSystem 创建Actor
        val master = actorSystem.actorOf(Props[Master], "Master-Acotory")
        // master给自己发送消息
        // ! 表示异步发送
        master ! "hello"
      }
    }
    
    • 配置Master.scala 的Program arguments,运行即可。
    localhost 8888
    

    1.7 Akka 中Master和Worker建立连接

    • Akka 中Master和Worker建立连接 并发送一条消息
    package com._xjk.rpc
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    class Worker extends Actor{
      var masterRef: ActorSelection = _
      // 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
      override def preStart(): Unit = {
        // 跟Master 建立连接, 拿到Master代理对象
        // 此处指定Master地址是akka的通信协议,是长链接,书写方式: 地址/user/Master名字
        masterRef = context.actorSelection("akka.tcp://Master-Actor-System@localhost:8888/user/Master-Actor")
        // 代理对象发送消息
        masterRef ! "hello"
      }
      // masterRef 为 代理对象,并不是 真正Master的引用,因为Master和Worker不在同一个进程中,Worker实际拿到是Master代理对象。
      override def receive: Receive = {
        case "hello" => {
          println("recv hello!")
        }
      }
    }
    
    
    object Worker {
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        // 字符串传入配置信息
        val confStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname="$host"
             |akka.remote.netty.tcp.port="$port"
          """.stripMargin
          val config: Config = ConfigFactory.parseString(confStr)
          // 创建ActorSystem
          val actorSystem = ActorSystem.apply("Worker-Actor-System", config)
          // 创建Actor, Props指定类型
          val worker = actorSystem.actorOf(Props[Worker], "Worker-Actor")
    //      worker ! "hello"
        }
    }
    
    • Worker将注册信息发送Master,定义case class里面放置信息。
    // 注册worker信息
    case class RegisterWorker(workerId:String, memory:Int, cores:Int)
    
    
    
    // Worker发送:
    val WORKER_ID = UUID.randomUUID().toString
    // 向Master发送注册消息: WorkerId,内存,核数等
    masterRef ! RegisterWorker(WORKER_ID, 10240, 24)
    
    // Master接收消息并打印:
    class Master extends Actor {
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case RegisterWorker(workerId, memory, cores) => {
          println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
        }
      }
    }
    
    
    • Master保存注册信息,并返回注册成功消息
    // Master.scala
    class Master extends Actor {
      val id2Worker = new mutable.HashMap[String, WorkerInfo]()
      val workers = new mutable.HashSet[WorkerInfo]()
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case RegisterWorker(workerId, memory, cores) => {
          println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
          val workerInfo = new WorkerInfo(workerId,memory,cores)
          // 保存Worker信息到Map
    //      id2Worker += ((workerId, workerInfo))
    //      id2Worker += (workerId -> workerInfo)
          id2Worker(workerId) = workerInfo
          // 保存Worker信息到Set中
          workers += workerInfo
          // 向Worker反馈成功的消息,获取消息发送者的引用(代理)
          // 消息接收者,可以通过sender方法获取消息发送者的连接
          sender() ! RegisterdWorker
        }
      }
    }
    
    // Worker.scala
    case RegisterdWorker => {
          println("worker 接收到注册成功消息!")
        }
    // WorkerInfo.scala
    package com._xjk.rpc
    
    class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
    
    }
    // Messages
    package com._xjk.rpc
    // 注册worker信息
    case class RegisterWorker(workerId:String, memory:Int, cores:Int)
    // Master向 Worker发送注册成功消息
    case object RegisterdWorker
    
    • Worker定期向Master发送心跳。

    2.完整Akka练习

    • Master和Worker建立连接,Worker向Master发送注册消息,Master保存注册信息并返回注册成功消息,Worker定期向Master发送心跳,Master定期检测超时Worker并移除。
    • Master.scala
    package com._xjk.rpc
    
    import java.util.Date
    
    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    import scala.concurrent.duration._
    
    import scala.collection.mutable
    
    class Master extends Actor {
      val id2Worker = new mutable.HashMap[String, WorkerInfo]()
      val workers = new mutable.HashSet[WorkerInfo]()
      // 检测时间
      val CHECK_INTERVAL = 15000
      // 启动定时器,定期检测超时Worker
      override def preStart(): Unit = {
        // 导入隐式转换
        import context.dispatcher
        // 启动定时器
        context.system.scheduler.schedule(
          0 millis,
          CHECK_INTERVAL millis,
          self,
          CheckTimeoutWorker
        )
      }
    
      /*
      * 接收消息
      * */
      override def receive: Receive = {
        case RegisterWorker(workerId, memory, cores) => {
          println(s"$workerId, $memory, $cores")// bc2ba2c8-c640-4d64-9b6a-01cbdfaf00f6, 10240, 24
          val workerInfo = new WorkerInfo(workerId,memory,cores)
          // 保存Worker信息到Map
    //      id2Worker += ((workerId, workerInfo))
    //      id2Worker += (workerId -> workerInfo)
          id2Worker(workerId) = workerInfo
          // 保存Worker信息到Set中
          workers += workerInfo
          // 向Worker反馈成功的消息,获取消息发送者的引用(代理)
          // 消息接收者,可以通过sender方法获取消息发送者的连接
          sender() ! RegisterdWorker
        }
        // Worker发送给Master的心跳消息
        case Heartbeat(workerId) => {
          println(new Date() + "," + workerId)
          // 根据workerId取出对应worker的Info
          val info = id2Worker(workerId)
          // 获取当前时间:
          val currentTime = System.currentTimeMillis()
          // 更新workerInfo 的心跳时间
          info.lastHeartbeatTime = currentTime
    
        }
        // 发给自己消息,用于检查Worker超时消息
        case CheckTimeoutWorker => {
          // 获取当前时间:
          var currentTime = System.currentTimeMillis()
          // 过滤超时workers
          val deadWorkers = workers.filter(w => currentTime - w.lastHeartbeatTime > CHECK_INTERVAL)
          // 删除出现问题的worker
    //      for (workerInfo <- deadWorkers) {
    //        val workerId= workerInfo.workerId
    //        // 从map中删除
    //        id2Worker -= workerId
    //        // 从set中删除
    //        workers -= workerInfo
    //      }
          deadWorkers.foreach(workerInfo => {
            val workerId= workerInfo.workerId
            // 从map中删除
            id2Worker -= workerId
            // 从set中删除
            workers -= workerInfo
            println(s"当前worker的个数为:${id2Worker.size}")
          })
        }
      }
    }
    
    
    //
    object Master {
    
      val MASTER_ACTOR_SYSTEM = "Master-Actor-System"
      val MASTER_ACTOR_NAME = "Master-Actor"
    
      def main(args: Array[String]): Unit = {
        // 报错:Error:scalac:bad option: "-make:transitive"
        // 注销掉.idea/scala_compiler.xml下的 带有 -make:transitive 一行
        val host = args(0)
        val port = args(1).toInt
        // 字符串传入配置信息
        val confStr =
          s"""
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname="$host"
            |akka.remote.netty.tcp.port="$port"
          """.stripMargin
        val conf = ConfigFactory.parseString(confStr)
        // 创建一个 ActorSystem(创建管理Actor,单例)
        val actorSystem = ActorSystem(MASTER_ACTOR_SYSTEM, conf)
        // 通过ActorSystem 创建Actor
        val master = actorSystem.actorOf(Props[Master], MASTER_ACTOR_NAME)
        // master给自己发送消息
        // ! 表示异步发送
    //    master ! "hello"
      }
    }
    
    • Messages
    package com._xjk.rpc
    
    
    // 注册worker信息
    case class RegisterWorker(workerId:String, memory:Int, cores:Int)
    
    // Master向 Worker发送注册成功消息
    case object RegisterdWorker
    
    // Worker发给Master心跳消息
    case class Heartbeat(workerId:String)
    
    // Worker发送给自己的消息
    case object  SendHeartbeat
    
    
    // Master发送给自己检测Worker超时消息
    case object CheckTimeoutWorker
    
    • Worker.scala
    package com._xjk.rpc
    
    import java.util.UUID
    // 导入时间单位
    import scala.concurrent.duration._
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    class Worker (var masterHost:String, var masterPort:Int, var memory:Int, var cores:Int) extends Actor{
      var masterRef: ActorSelection = _
      // worker 的 ID
      val WORKER_ID = UUID.randomUUID().toString
      var HEARTBEAT_INTERVAL = 10000
      // 让Worker跟Master建立连接:在Worker的构造方法之后,在receive方法之前。
      override def preStart(): Unit = {
    
        // 跟Master 建立连接, 拿到Master代理对象
        masterRef = context.actorSelection(s"akka.tcp://${Master.MASTER_ACTOR_SYSTEM}@$masterHost:$masterPort/user/${Master.MASTER_ACTOR_NAME}")
        // 代理对象发送消息
    //    masterRef ! "hello"
        // 向Master发送注册消息: WorkerId,内存,核数等
        masterRef ! RegisterWorker(WORKER_ID, memory, cores)
      }
    
      override def receive: Receive = {
        case "hello" => {
          println("recv hello!")
        }
        // Master返回给Worker注册成功的消息
        case RegisterdWorker => {
          println("worker 接收到注册成功消息!")
          // 导入隐式转换
          import context.dispatcher
          // 上下文中定义延时0秒,每10秒给自己发送消息。self 表示发送给自己。
          context.system.scheduler.schedule(
            0 millis, HEARTBEAT_INTERVAL millis, self, SendHeartbeat
          )
        }
          // 接收自己发送消息
        case SendHeartbeat => {
          // 判断是否断开连接
          // 判断是否Master发生变化
          // 将心跳消息发送给master
          masterRef ! Heartbeat(WORKER_ID)
        }
      }
    }
    
    
    object Worker {
      val WORKER_ACTOR_SYSTEM = "Worker-Actor-System"
      val WORKER_ACTOR_NAME = "Worker-Actor"
      def main(args: Array[String]): Unit = {
        val masterHost = args(0)
        val masterHort = args(1).toInt
    
        val workerHost = args(2)
        val workerPort = args(3).toInt
    
        val workerMemory = args(4).toInt
        val workerCores = args(5).toInt
    
    
    
        // 字符串传入配置信息
        val confStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname="$workerHost"
             |akka.remote.netty.tcp.port="$workerPort"
          """.stripMargin
          val config: Config = ConfigFactory.parseString(confStr)
          // 创建ActorSystem
          val actorSystem = ActorSystem.apply(WORKER_ACTOR_SYSTEM, config)
          // 创建Actor, Props指定类型
          val worker = actorSystem.actorOf(Props(new Worker(masterHost, masterHort, workerMemory, workerCores)), WORKER_ACTOR_NAME)
    //      worker ! "hello"
        }
    }
    
    • WorkerInfo.scala
    package com._xjk.rpc
    
    class WorkerInfo(val workerId:String, var memory: Int, var cores:Int) {
      var lastHeartbeatTime: Long = _
    }
    
    
    • 通过maven打jar包.Lifecycle->package.(注意路径有中文会有错误)

    • 运行Master

    java -jar .AkkaRPC-Master-1.0.jar 127.0.0.1 8888
    
    • 运行Worker
    java -jar .AkkaRPC-Worker-1.0.jar 127.0.0.1 8888 127.0.0.1 9999 1024 2
    
  • 相关阅读:
    Android入门:监听ContentProvider数据改变
    Android入门:ListView(SimpleCursorAdapter实现)
    Android入门:ContentProvider
    Android入门:ListView(继承BaseAdapter实现)
    随机生成字符串实现
    很惊讶douban.com是用python语言的一个框架写的
    简单的总是好的,在这个复杂的世界: java simple log
    Quixtoe比PHP更简单吗?
    python 与 ruby (ruby学习资源大全)
    Web的未来:语义网
  • 原文地址:https://www.cnblogs.com/xujunkai/p/14413186.html
Copyright © 2011-2022 走看看