zoukankan      html  css  js  c++  java
  • Scala学习之路 (十)Scala的Actor

    一、Scala中的并发编程

    1、Java中的并发编程

    ①Java中的并发编程基本上满足了事件之间相互独立,但是事件能够同时发生的场景的需要。 

    ②Java中的并发编程是基于共享数据和加锁的一种机制,即会有一个共享的数据,然后有若干个线程去访问这个共享的数据(主要是对这个共享的数据进行修改),同时Java利用加锁的机制(即synchronized)来确保同一时间只有一个线程对我们的共享数据进行访问,进而保证共享数据的一致性。 

    ③Java中的并发编程存在资源争夺和死锁等多种问题,因此程序越大问题越麻烦。 

    2、Scala中的并发编程

    ①Scala中的并发编程思想与Java中的并发编程思想完全不一样,Scala中的Actor是一种不共享数据,依赖于消息传递的一种并发编程模式, 避免了死锁、资源争夺等情况。在具体实现的过程中,Scala中的Actor会不断的循环自己的邮箱,并通过receive偏函数进行消息的模式匹配并进行相应的处理。 

    ②如果Actor A和 Actor B要相互沟通的话,首先A要给B传递一个消息,B会有一个收件箱,然后B会不断的循环自己的收件箱, 若看见A发过来的消息,B就会解析A的消息并执行,处理完之后就有可能将处理的结果通过邮件的方式发送给A。

    二、Scala中的Actor

    1、什么是Actor

    一个actor是一个容器,它包含 状态, 行为,信箱,子Actor 和 监管策略,所有这些包含在一个ActorReference(Actor引用)里。一个actor需要与外界隔离才能从actor模型中获益,所以actor是以actor引用的形式展现给外界的

    2、ActorSystem的层次结构

    如果一个Actor中的业务逻辑非常复杂,为了降低代码的复杂度,可以将其拆分成多个子任务(在一个actor的内部可以创建一个或多个actor,actor的创建者也是该actor的监控者) 

    一个ActorSystem应该被正确规划,例如哪一个Actor负责监控,监控什么等等:

      • 负责分发的actor管理接受任务的actor
      • 拥有重要数据的actor,找出所有可能丢失数据的子actor,并且处理他们的错误。

    3、ActorPath

    ActorPath是通过字符串描述Actor的层级关系,并唯一标识一个Actor的方法。

    ActorPath包含协议,位置 和 Actor层级关系

    //本地path
    "akka://my-sys/user/service-a/worker1"   
    
    //远程path  akka.tcp://(ActorSystem的名称)@(远程地址的IP):(远程地址的端口)/user/(Actor的名称)
    "akka.tcp://my-sys@host.example.com:5678/user/service-b" 
    
    //akka集群
    "cluster://my-cluster/service-c"

     远程地址不清楚是多少的话,可以在远程的服务启动的时候查看

    4、获取Actor Reference

    获取Actor引用的方式有两种:创建 和 查找。 

    要创建Actor,可以调用ActorSystem.actorOf(..),它创建的actor在guardian actor之下,接着可以调用ActorContext的actorOf(…) 在刚才创建的Actor内生成一个actor树。这些方法会返回新创建的actor的引用,每一个actor都可以通过访问ActorContext来获得自己(self),子Actor(children,child)和父actor(parent)。


    要查找Actor Reference,可以调用ActorSystem或ActorContext的actorSelection(“path”),在查找ActorRef时,可以使用相对路径或绝对路径,如果是相对路径,可以用 .. 来表示parent actor。

    actorOf / actorSelection / actorFor的区别

    • actorOf 创建一个新的actor,创建的actor为调用该方法所属的context的直接子actor。

    • actorSelection 查找现有actor,并不会创建新的actor。

    • actorFor 查找现有actor,不创建新的actor,已过时。

    5、Actor和ActorSystem

    Actor:
    就是用来做消息传递的
    用来接收和发送消息的,一个actor就相当于是一个老师或者是学生。
    如果我们想要多个老师,或者学生,就需要创建多个actor实例。
    ActorSystem:
    用来创建和管理actor,并且还需要监控Actor。ActorSystem是单例的(object)
    在同一个进程里面,只需要一个ActorSystem就可以了

    三、Actor的示例

    1、示例说明

    2、代码实现

    MyResourceManager.scala(服务端)

    package com.rpc
    
    import akka.actor._
    import com.typesafe.config.{Config, ConfigFactory}
    
    import scala.collection.mutable
    
    class MyResourceManager(var resourceManagerHostName:String, var resourceManagerPort:Int) extends Actor {
      /**
        * 定义一个Map,接受MyNodeManager的注册信息,key是主机名,
        * value是NodeManagerInfo对象,里面存储主机名、CPU和内存信息
        * */
      var registerMap = new mutable.HashMap[String,NodeManagerInfo]()
      /**
        * 定义一个Set,接受MyNodeManager的注册信息,key是主机名,
        * value是NodeManagerInfo对象,里面存储主机名、CPU和内存信息
        * 实际上和上面的Map里面存档内容一样,容易变历,可以不用写,主要是模仿后面Spark里面的内容
        * 方便到时理解Spark源码
        * */
      var registerSet = new mutable.HashSet[NodeManagerInfo]()
    
    
      override def preStart(): Unit = {
        import scala.concurrent.duration._
        import context.dispatcher
        context.system.scheduler.schedule(0 millis, 5000 millis, self,CheckTimeOut)
      }
    
      //对MyNodeManager传过来的信息进行匹配
      override def receive: Receive = {
        //匹配到NodeManager的注册信息进行对应处理
        case NodeManagerRegisterMsg(nodeManagerID,cpu,memory) => {
          //将注册信息实例化为一个NodeManagerInfo对象
          val registerMsg = new NodeManagerInfo(nodeManagerID,cpu,memory)
          //将注册信息存储到registerMap和registerSet里面,key是主机名,value是NodeManagerInfo对象
          registerMap.put(nodeManagerID,registerMsg)
          registerSet += registerMsg
          //注册成功之后,反馈个MyNodeManager一个成功的信息
          sender() ! new RegisterFeedbackMsg("注册成功!" + resourceManagerHostName+":"+resourceManagerPort)
        }
        //匹配到心跳信息做相应处理
        case HeartBeat(nodeManagerID) => {
          //获取当前时间
          val time:Long = System.currentTimeMillis()
          //根据nodeManagerID获取NodeManagerInfo对象
          val info = registerMap(nodeManagerID)
          info.lastHeartBeatTime = time
          //更新registerMap和registerSet里面nodeManagerID对应的NodeManagerInfo对象信息(最后一次心跳时间)
          registerMap(nodeManagerID) = info
          registerSet += info
        }
        //检测超时,对超时的数据从集合中删除
        case CheckTimeOut => {
          var time = System.currentTimeMillis()
          registerSet
            .filter( nm => time - nm.lastHeartBeatTime > 10000)
            .foreach(deadnm => {
              registerSet -= deadnm
              registerMap.remove(deadnm.nodeManagerID)
            })
          println("当前注册成功的节点数:" + registerMap.size)
        }
      }
    }
    
    object MyResourceManager {
      def main(args: Array[String]): Unit = {
        /**
          * 传参:
          *   ResourceManager的主机地址、端口号
          * */
        val RM_HOSTNAME = args(0)
        val RM_PORT = args(1).toInt
    
        val str:String =
          """
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname =localhost
            |akka.remote.netty.tcp.port=19888
          """.stripMargin
        val conf: Config = ConfigFactory.parseString(str)
        val actorSystem = ActorSystem(Conf.RMAS,conf)
        actorSystem.actorOf(Props(new MyResourceManager(RM_HOSTNAME,RM_PORT)),Conf.RMA)
      }
    }
    View Code

    MyNodeManager.scala(客户端)

    package com.rpc
    
    import java.util.UUID
    
    import akka.actor._
    import com.typesafe.config.{Config, ConfigFactory}
    
    class MyNodeManager(resourceManagerHostName:String,resourceManagerPort:Int,cpu:Int,memory:Int) extends Actor{
      //MyNodeManager的UUID
      var nodeManagerID:String = _
      var rmref:ActorSelection = _
      override def preStart(): Unit = {
        //获取MyResourceManager的Actor的引用
        rmref = context.actorSelection(s"akka.tcp://${Conf.RMAS}@${resourceManagerHostName}:${resourceManagerPort}/user/${Conf.RMA}")
        //生成随机的UUID
        nodeManagerID = UUID.randomUUID().toString
        /**
          * 向MyResourceManager发送注册信息
          * */
        rmref ! NodeManagerRegisterMsg(nodeManagerID,cpu,memory)
    
      }
      //进行信息匹配
      override def receive: Receive = {
        //匹配到注册成功之后MyResourceManager反馈回的信息,进行相应处理
        case RegisterFeedbackMsg(feedbackMsg) => {
          /**
            * initialDelay: FiniteDuration, 多久以后开始执行
            * interval:     FiniteDuration, 每隔多长时间执行一次
            * receiver:     ActorRef, 给谁发送这个消息
            * message:      Any  发送的消息是啥
            */
          //定时任务需要导入的工具包
          import scala.concurrent.duration._
          import context.dispatcher
          //定时向自己发送信息
          context.system.scheduler.schedule(0 millis, 3000 millis, self, SendMessage)
        }
        //匹配到SendMessage信息之后做相应处理
        case SendMessage => {
          //向MyResourceManager发送心跳信息
          rmref ! HeartBeat(nodeManagerID)
          println(Thread.currentThread().getId + ":" + System.currentTimeMillis())
        }
      }
    }
    
    object MyNodeManager {
      def main(args: Array[String]): Unit = {
        /**
          * 传参:
          *   NodeManager的主机地址、端口号、CPU、内存
          *   ResourceManager的主机地址、端口号
          * */
        val NM_HOSTNAME = args(0)
        val NM_PORT = args(1)
        val NM_CPU:Int = args(2).toInt
        val NM_MEMORY:Int = args(3).toInt
    
        val RM_HOSTNAME = args(4)
        val RM_PORT = args(5).toInt
    
        val str:String =
          s"""
            |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
            |akka.remote.netty.tcp.hostname = ${NM_HOSTNAME}
            |akka.remote.netty.tcp.port = ${NM_PORT}
          """.stripMargin
        val conf: Config = ConfigFactory.parseString(str)
        val actorSystem = ActorSystem(Conf.NMAS,conf)
        actorSystem.actorOf(Props(new MyNodeManager(RM_HOSTNAME,RM_PORT,NM_CPU,NM_MEMORY)),Conf.NMA)
      }
    }
    View Code

    Conf.scala(配置文件)

    package com.rpc
    
    //避免硬编码
    object Conf {
      //ResourceManagerActorSystem
      val RMAS = "MyRMActorSystem"
      //ResourceManagerActor
      val RMA = "MyRMActor"
      //NodeManagerActorSystem
      val NMAS = "MyNMActorSystem"
      //NodeManagerActor
      val NMA = "MyNMactor"
    }
    View Code

    Message.scala

    package com.rpc
    //NodeManager注册信息
    case class NodeManagerRegisterMsg(val nodeManagerID:String, var cpu:Int, var memory:Int)
    //ResourceManager接收到注册信息成功之后的返回信息
    case class RegisterFeedbackMsg(val feedbackMsg: String)
    //NodeManager的心跳信息
    case class HeartBeat(val nodeManagerID:String)
    //NodeManager注册信息
    class NodeManagerInfo(val nodeManagerID:String, var cpu:Int, var memory:Int){
      //定义一个属性,存储上一次的心跳时间
      var lastHeartBeatTime:Long = _
    }
    
    case object SendMessage
    case object CheckTimeOut
    View Code

    3、运行

    (1)运行MyResourceManager

    运行结果

    发现报错数组越界,原因是在启动时需要传入2个参数

    重新启动,启动成功

    2、运行MyNodeManager

    报相同的错误,不过此处需要传入6个参数

    重新启动,启动成功

    3、观察MyResourceManager

    发现有一个节点连接成功

    4、再启动一个MyNodeManager观察情况

    先修改MyNodeManager配置里面的端口

    再启动

    启动成功之后观察MyResourceManager,此时有2个节点连接成功

    5、关闭一个节点,观察情况

    集合中连接超时的成功删除

  • 相关阅读:
    idou老师教你学Istio 08: 调用链埋点是否真的“零修改”?
    idou老师教你学Istio 07: 如何用istio实现请求超时管理
    idou老师教你学Istio06: 如何用istio实现流量迁移
    Strusts2笔记8--文件的上传和下载
    Strusts2笔记7--国际化
    Strusts2笔记6--拦截器
    Strusts2笔记5--数据验证
    Strusts2笔记4--类型转换器
    Struts2笔记3--获取ServletAPI和OGNL与值栈
    Struts2笔记2--动态方法调用和Action接收请求方式
  • 原文地址:https://www.cnblogs.com/qingyunzong/p/8886061.html
Copyright © 2011-2022 走看看