zoukankan      html  css  js  c++  java
  • Actor和AKKA的使用

    添加需要的maven依赖

    <dependency>
        <groupId>com.typesafe</groupId>
        <artifactId>ssl-config-akka_2.11</artifactId>
        <version>0.1.2</version>
    </dependency>
    
    <!-- 添加scala的依赖 -->
    <dependency>
        <groupId>org.scala-lang</groupId>
        <artifactId>scala-library</artifactId>
        <version>${scala.version}</version>
    </dependency>
    
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-slf4j_2.11</artifactId>
        <version>2.5.23</version>
    </dependency>
    
    <!-- 添加akka的actor依赖 -->
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.5.23</version>
    </dependency>
    
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-stream_2.11</artifactId>
        <version>2.5.23</version>
    </dependency>
    
    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-remote_2.11</artifactId>
        <version>2.5.23</version>
    </dependency>
    

    Actor (线程)

    Actor信息传递

    import scala.actors.{Actor, Future}
    
    object ActorDM {
      def main(args: Array[String]): Unit = {
    
        // 创建对象MyActor
        val myActor1 = new MyActor()
        // 启动线程
        myActor1.start()
    
        // 发送异步消息,没有返回值
        myActor1 ! "this is one info!"
    
        // 发送同步消息,阻塞等待返回值
        val result: Any = myActor1 !? "this is two info!"
        println(result)
    
        // 发送异步消息, 有返回值
        val future: Future[Any] = myActor1 !! "this is three info"
        // 等待3秒,
        Thread.sleep(3000)
        // 判断是否有值返回
        if(future.isSet){
          // 取出返回的值
          println(future.apply())
        }else{
          println("None")
        }
    
        // 发送对象
        myActor1 ! new AC("ac name")
    
        // 关闭线程
        myActor1 ! "stop"
    
    
      }
    
      case class AC(name: String) {}
    
      class MyActor extends Actor {
        
        // 重写act方法,类似java的Thread的run方法
        override def act(): Unit = {
    
          var flag: Boolean = true
    
          while (flag){
            receive{
              // 接收字符串
              case str: String => {
                if(str.equals("stop")){
                  flag = false
                }
                println(s"接收的信息是: $str")
                sender ! s"发送的 $str 已收到!"
              }
                // 接收 AC()对象
              case AC(name) => println(s"AC name = $name")
            }
          }
        }
      }
    }
    
    接收的信息是: this is one info!
    接收的信息是: this is two info!
    发送的 this is two info! 已收到!
    接收的信息是: this is three info
    发送的 this is three info 已收到!
    AC name = ac name
    接收的信息是: stop
    

    Actor信息互传

    def main(args: Array[String]): Unit = {
        val teacher = new Teacher()
        teacher.start()
        val student = new Student(teacher)
        student.start()
    
        student ! Request("Hi teacher!")
    
    }
    
    case class Request(question: String) {}
    
    case class Response(answer: String) {}
    
    // student线程, 传入teacher
    class Student(teacher: Teacher) extends Actor {
        override def act(): Unit = {
            while (true) {
                receive {
                    // 接收学生的问题, 将问题内容发送给老师.
                    case Request(question) => teacher ! Request(question)
                    // 接收老师的回答, 打印回答信息
                    case Response(answer) => println(s"teacher answer is : $answer")
                }
            }
        }
    }
    
    class Teacher extends Actor {
        override def act(): Unit = {
            while (true) {
                receive {
                    // 接收问题, 回应发送者.
                    case Request(question) => sender ! Response("I am teacher this is my answer!")
                }
            }
        }
    }
    
    teacher answer is I am teacher this is my answer!
    

    AkkA

    信息发送

    import akka.actor.{Actor, ActorSystem, Props}
    
    class HelloActor extends Actor{
      // 重写接受消息的偏函数,其功能是接受消息并处理
      override def receive: Receive = {
        case 1 => println("this is first line")
        case 2 => println("this is two line")
        case 3 => {
          println("stop actorRef")
          context.stop(self) // 停止自己的actorRef
    
          println("stop ActorSystem")
          context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
    
        }
      }
    }
    
    object ActorDM {
      /**
        * 创建线程池对象MyFactory,用来创建actor的对象的
        */
      private val MyFactory = ActorSystem("myFactory")    //里面的"myFactory"参数为线程池的名称
      /**
        *     通过MyFactory.actorOf方法来创建一个actor,注意,Props方法的第一个参数需要传递我们自定义的HelloActor类,
        *     第二个参数是给actor起个名字
        */
      private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")
    
      def main(args: Array[String]): Unit = {
        var flag = true
        while(flag){
          /**
            * 使用helloActorRef来给自己发送消息,helloActorRef有一个叫做感叹号("!")的方法来发送消息
            */
          for(num <- 1 to 3){
            if (num < 3){
              helloActorRef ! num
            }else if(num == 3){
              flag = false
              println("程序即将结束!")
              helloActorRef ! num
            }
          }
    
          /**
            * 为了不让while的运行速度在receive方法之上,我们可以让他休眠0.1秒
            */
          Thread.sleep(100)
        }
      }
    }
    

    信息交互

    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    
    // 传入一个 ActorRef 对象
    class User1(val user2: ActorRef) extends Actor {
      override def receive: Receive = {
        case "开始" => user2 ! "到你了"
        case "到你了" => { // 再次发送信息给user2
          println("User1: 我的完成了!")
          user2 ! "到你了"
        }
      }
    }
    
    class User2 extends Actor {
      override def receive: Receive = {
        case "到你了" => {
          println("User2: 将军!")
          Thread.sleep(2000)
          // 反馈信息给 user1
          sender() ! "到你了"
        }
      }
    }
    
    object AkkaActorDM extends App {
      
      //  创建 actorSystem的工厂,用来生产ActorRef对象!
      private val actorSystem = ActorSystem("local_Actor")
      
      // 创建user2 的ActorRef对象 
      private val user2 = actorSystem.actorOf(Props[User2], "user2")
      
      // Props(new User1(user2)) 来创建需要传参的user1类
      private val user1 = actorSystem.actorOf(Props(new User1(user2)), "user1")
      
      // 发送开始信号
      user1 ! "开始"
    }
    

    服务端和客户端交互程序

    Message.scala

    case class ServerMessage(str: String) {}
    
    case class ClientMessage(msg: Any) {}
    

    ServerAKKA.scala

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    // 继承 akka 的trait Actor
    class ServerAKKA extends Actor {
      // 反复调用,接收发送的信息
      override def receive: Receive = {
        case "start" => println("服务已启动!")
        case ClientMessage(msg) => { // 接收客户端发送的信息
          if (msg.equals("stop")) {
            context.stop(self) // 停止自己的actorRef
            context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
          }
          println(s"来自客户端的信息是: " + msg)
          // 返回信息响应客户端
          sender ! ServerMessage(s"你发送的 [ $msg ] 信息服务器已收到!")
        }
        case _ => println("Other info!")
      }
    }
    
    object ServerAKKA {
      def main(args: Array[String]): Unit = {
        // 服务器IP
        val host = "192.168.1.104"
        // 服务器端口
        val port = "8888"
    
        // 设置配置字符串
        val strConf =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
         """.stripMargin
    
        // 解析字符串
        val config: Config = ConfigFactory.parseString(strConf)
    
        // 创建使用伴生对象的apply方法创建ActorSystem
        val actorSystem = ActorSystem("ServerAKKA", config)
    
        //通过ServerAKKA类型,反射创建实例
        val server = actorSystem.actorOf(Props[ServerAKKA], "server")
    
        server ! "start"
    
      }
    }
    

    ClientAKKA.scala

    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    import scala.io.StdIn
    
    
    class ClientAKKA extends Actor {
    
      var server: ActorSelection = _
    
      // 在Actor构造方法后执行,但是在receive方法之前执行, 只执行一次,做一些初始化的操作.
      override def preStart(): Unit = {
    
        val serverName = "server"
    
        // 连接服务器的链接,启动服务器时控制台会打印连接的地址
        // akka.tcp://服务器ActorSystem名@服务器IP:服务器端口/user/Actor名
        server = context.actorSelection(s"akka.tcp://ServerAKKA@192.168.1.104:8888/user/$serverName")
    
      }
    
      override def receive: Receive = {
        // 接收服务器的信息
        case ServerMessage(str) => {
          println(s"来自服务器的信息: " + str)
        }
        // 接收客户端的信息
        case ClientMessage(msg) => {
          server ! ClientMessage(msg)
        }
      }
    }
    
    object ClientAKKA {
      def main(args: Array[String]): Unit = {
        // 客户端IP
        val host = "192.168.1.104"
        // 客户端端口, 同一机器时不能使用服务器的端口(已经占用)
        val port = "8889"
    
        val strConf =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
          """.stripMargin
    
        val conf = ConfigFactory.parseString(strConf)
    
        val actorSystem = ActorSystem("ClientAKKA", conf)
    
        val client = actorSystem.actorOf(Props[ClientAKKA], "client")
    
        // 不断从控制台接收输入发送到服务器.
        var flag = true
        while (flag) {
          Thread.sleep(1000)
          val info = StdIn.readLine("请输入需要发送的信息:")
          if (!info.equals("stop")) {
            client ! ClientMessage(info)
          } else {
            flag = false
          }
        }
    
      }
    }
    

    定时任务

    package AKKA
    
    import akka.actor.{Actor, ActorSystem, Props}
    
    import scala.util.Random
    
    
    case class Task()
    
    class TimingTask extends Actor {
      var random = new Random()
    
      override def receive: Receive = {
        case "start" => {
          // 导入需要的包
          import scala.concurrent.ExecutionContext.Implicits.global
          import scala.concurrent.duration._
          // 设置定时任务, 发送Task给自己
          context.system.scheduler.schedule(0 millis, 5000 millis, self, Task)
    
          println("定时任务开始")
        }
        case Task => {
          println("本次的随机数为: " + random.nextInt())
        }
      }
    }
    
    object TimingTask {
    
      def main(args: Array[String]): Unit = {
        val actorSystem = ActorSystem("ActorSystem")
        val actor = actorSystem.actorOf(Props[TimingTask], "actor")
        actor ! "start"
      }
    }
    
  • 相关阅读:
    (新)Linux 安装、配置 MondoDB
    Docker 简介
    Windows 环境下的mysql安装及端口更换详解
    Jenkins的使用
    .Net Core Web API 上传图片或文件
    (不适用.Net Core)layui+WebApi上传文件、上传图片
    (不适用于.Net Core环境)jquery+WebAPI 上传文件、图片
    Linux .Net Core发布项目及搭建
    Jquery+JavaScript 随笔
    Vue 随笔
  • 原文地址:https://www.cnblogs.com/studyNotesSL/p/11463326.html
Copyright © 2011-2022 走看看