zoukankan      html  css  js  c++  java
  • Scala进阶之路-并发编程模型Akka入门篇

                   Scala进阶之路-并发编程模型Akka入门篇

                                      作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.Akka Actor介绍

    1>.Akka介绍

       写并发程序很难。程序员不得不处理线程、锁和竞态条件等等,这个过程很容易出错,而且会导致程序代码难以阅读、测试和维护。Akka 是 JVM 平台上构建高并发、分布式和容错应用的工具包和运行时。Akka 用 Scala 语言写成,同时提供了 Scala 和 JAVA 的开发接口。

    2>.Akka 中 中 Actor  模型

      Akka 处理并发的方法基于 Actor 模型。在基于 Actor 的系统里,所有的事物都是 Actor,就好像在面向对象设计里面所有的事物都是对象一样。但是有一个重要区别,那就是 Actor 模型是作为一个并发模型设计和架构的,而面向对象模式则不是。Actor 与 Actor 之间只能通过消息通信。

    3>.Akaka的特点

      第一:它是对并发模型进行了更高的抽象;
      第二:它是异步、非阻塞、高性能的事件驱动编程模型;
      第三:它是轻量级事件处理(1GB 内存可容纳百万级别个 Actor);

    4>.为什么 Actor 模型是一种处理并发问题的解决方案?

      处理并发问题就是如何保证共享数据的一致性和正确性,为什么会有保持共享数据正确性这个问题呢?无非是我们的程序是多线程的,多个线程对同一个数据进行修改,若不加同步条件,势必会造成数据污染。那么我们是不是可以转换一下思维,用单线程去处理相应的请求,但是又有人会问了,若是用单线程处理,那系统的性能又如何保证。Actor 模型的出现解决了这个问题,简化并发编程,提升程序性能。

    5>.Maven依赖

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>cn.org.yinzhengjie</groupId>
     8     <artifactId>MyActor</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <!-- 定义一下常量 -->
    12     <properties>
    13         <encoding>UTF-8</encoding>
    14         <scala.version>2.11.8</scala.version>
    15         <scala.compat.version>2.11</scala.compat.version>
    16         <akka.version>2.4.17</akka.version>
    17     </properties>
    18 
    19     <dependencies>
    20         <!-- 添加scala的依赖 -->
    21         <dependency>
    22             <groupId>org.scala-lang</groupId>
    23             <artifactId>scala-library</artifactId>
    24             <version>${scala.version}</version>
    25         </dependency>
    26 
    27         <!-- 添加akka的actor依赖 -->
    28         <dependency>
    29             <groupId>com.typesafe.akka</groupId>
    30             <artifactId>akka-actor_${scala.compat.version}</artifactId>
    31             <version>${akka.version}</version>
    32         </dependency>
    33 
    34         <!-- 多进程之间的Actor通信 -->
    35         <dependency>
    36             <groupId>com.typesafe.akka</groupId>
    37             <artifactId>akka-remote_${scala.compat.version}</artifactId>
    38             <version>${akka.version}</version>
    39         </dependency>
    40     </dependencies>
    41 
    42     <!-- 指定插件-->
    43     <build>
    44         <!-- 指定源码包和测试包的位置 -->
    45         <sourceDirectory>src/main/scala</sourceDirectory>
    46         <testSourceDirectory>src/test/scala</testSourceDirectory>
    47         <plugins>
    48             <!-- 指定编译scala的插件 -->
    49             <plugin>
    50                 <groupId>net.alchim31.maven</groupId>
    51                 <artifactId>scala-maven-plugin</artifactId>
    52                 <version>3.2.2</version>
    53                 <executions>
    54                     <execution>
    55                         <goals>
    56                             <goal>compile</goal>
    57                             <goal>testCompile</goal>
    58                         </goals>
    59                         <configuration>
    60                             <args>
    61                                 <arg>-dependencyfile</arg>
    62                                 <arg>${project.build.directory}/.scala_dependencies</arg>
    63                             </args>
    64                         </configuration>
    65                     </execution>
    66                 </executions>
    67             </plugin>
    68 
    69 
    70         </plugins>
    71     </build>
    72 </project>

      自定义默认的源代码包和测试包的位置,需要手动穿件Source Root目录哟,如下图:

     

    二.编写HelloActor

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.actor
     7 
     8 import akka.actor.{Actor, ActorSystem, Props}
     9 
    10 import scala.io.StdIn
    11 
    12 class HelloActor extends Actor{
    13     // 重写接受消息的偏函数,其功能是接受消息并处理
    14     override def receive: Receive = {
    15         case "你好帅" => println("竟说实话,我喜欢你这种人!")
    16         case "丑八怪" => println("滚犊子 !")
    17         case "stop" => {
    18             context.stop(self) // 停止自己的actorRef
    19             context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
    20         }
    21     }
    22 }
    23 
    24 object HelloActor {
    25     /**
    26       * 创建线程池对象MyFactory,用来创建actor的对象的
    27       */
    28     private val MyFactory = ActorSystem("myFactory")    //里面的"myFactory"参数为线程池的名称
    29     /**
    30       *     通过MyFactory.actorOf方法来创建一个actor,注意,Props方法的第一个参数需要传递我们自定义的HelloActor类,
    31       * 第二个参数是给actor起个名字
    32       */
    33     private val helloActorRef = MyFactory.actorOf(Props[HelloActor], "helloActor")
    34 
    35     def main(args: Array[String]): Unit = {
    36         var flag = true
    37         while(flag){
    38             /**
    39               * 接受用户输入的字符串
    40               */
    41             print("请输入您想发送的消息:")
    42             val consoleLine:String = StdIn.readLine()
    43             /**
    44               * 使用helloActorRef来给自己发送消息,helloActorRef有一个叫做感叹号("!")的方法来发送消息
    45               */
    46             helloActorRef ! consoleLine
    47             if (consoleLine.equals("stop")){
    48                 flag = false
    49                 println("程序即将结束!")
    50             }
    51             /**
    52               * 为了不让while的运行速度在receive方法之上,我们可以让他休眠0.1秒
    53               */
    54             Thread.sleep(100)
    55         }
    56     }
    57 }

      以上代码执行结果如下:

    三.两个actor通信案例-模拟下棋对话

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.actor
     7 
     8 import akka.actor.{ActorSystem, Props}
     9 
    10 import akka.actor.{Actor, ActorRef}
    11 
    12 /**
    13   * 定义玩家1
    14   */
    15 class player1Actor(val p2: ActorRef) extends Actor{
    16     // receive方法是负责处理消息的
    17     override def receive: Receive = {
    18         case "start" => {
    19             println("棋圣:I'm OK !")
    20             p2 ! "该你了"
    21         }
    22         case "将军" => {
    23             println("棋圣:你真猛!")
    24             Thread.sleep(1000)
    25             p2 ! "该你了"
    26         }
    27     }
    28 }
    29 
    30 
    31 /**
    32   * 定义玩家2
    33   */
    34 class player2Actor extends Actor{
    35 
    36     override def receive: Receive = {
    37         case "start" => println("棋仙说:I'm OK !")
    38         case "该你了" => {
    39             println("棋仙:那必须滴!")
    40             Thread.sleep(1000)
    41             /**
    42               * 注意,这个“sender()”,其实就是对ActorRef的一个引用。它指的是给发送"该你了"的这个对象本身!
    43               */
    44             sender() ! "将军"
    45         }
    46     }
    47 }
    48 
    49 
    50 object ChineseChess extends App{
    51     // 创建 actorSystem的工厂,用来生产ActorRef对象!
    52     private val ChineseChessActorSystem = ActorSystem("Chinese-chess")
    53     /**
    54       * 通过actorSystem创建ActorRef
    55       */
    56     private val p2 = ChineseChessActorSystem.actorOf(Props[player2Actor], "player2")            //创建player2Actor对象
    57     private val p1 = ChineseChessActorSystem.actorOf(Props(new player1Actor(p2)), "player1")   //创建player1Actor对象
    58 
    59     p2 ! "start"
    60     p1 ! "start"
    61 }

      运行以上代码输出结果如下:

    四.服务端和客户端交互的小程序

    1>.服务端代码

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.robot
     7 
     8 import akka.actor.{Actor, ActorSystem, Props}
     9 import com.typesafe.config.ConfigFactory
    10 
    11 class ServerActor extends Actor{
    12     /**
    13       * receive方法是用来处理客户端发送过来的问题的
    14       */
    15     override def receive: Receive = {
    16         case "start" => println("天猫系统已启动...")
    17 
    18         case ClientMessage(msg) => {
    19             println(s"收到客户端消息:$msg")
    20             msg match {
    21                 /**
    22                   * sender()发送端的代理对象, 发送到客户端的mailbox中 -> 客户端的receive
    23                    */
    24                 case "你叫啥" =>
    25                     sender() ! ServerMessage("本宝宝是天猫精灵")
    26                 case "你是男是女" =>
    27                     sender() ! ServerMessage("本宝宝非男非女")
    28                 case "你有男票吗" =>
    29                     sender() ! ServerMessage("本宝宝还小哟")
    30                 case "stop" =>
    31                     context.stop(self) // 停止自己的actorRef
    32                     context.system.terminate() // 关闭ActorSystem,即关闭其内部的线程池(ExcutorService)
    33                     println("天猫系统已停止...")
    34                 case _ =>
    35                     sender() ! ServerMessage("对不起,主人,我不知道你在说什么.......")
    36             }
    37         }
    38     }
    39 }
    40 
    41 object ServerActor {
    42     def main(args: Array[String]): Unit = {
    43         //定义服务端的ip和端口
    44         val host = "127.0.0.1"
    45         val port = 8088
    46         /**
    47           * 使用ConfigFactory的parseString方法解析字符串,指定服务端IP和端口
    48           */
    49         val config = ConfigFactory.parseString(
    50             s"""
    51                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
    52                |akka.remote.netty.tcp.hostname=$host
    53                |akka.remote.netty.tcp.port=$port
    54         """.stripMargin)
    55         /**
    56           * 将config对象传递给ActorSystem并起名为"Server",为了是创建服务端工厂对象(ServerActorSystem)。
    57           */
    58         val ServerActorSystem = ActorSystem("Server", config)
    59         /**
    60           * 通过工厂对象创建服务端的ActorRef
    61           */
    62         val serverActorRef = ServerActorSystem.actorOf(Props[ServerActor], "Miao~miao")
    63         /**
    64           * 到自己的mailbox -》 receive方法
    65           */
    66         serverActorRef ! "start"
    67     }
    68 }

    2>.客户端代码

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.robot
     7 
     8 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
     9 import com.typesafe.config.ConfigFactory
    10 
    11 import scala.io.StdIn
    12 
    13 class ClientActor(host: String, port: Int) extends Actor{
    14 
    15     var serverActorRef: ActorSelection = _ // 服务端的代理对象
    16 
    17     // 在receive方法之前调用
    18     override def preStart(): Unit = {
    19         // akka.tcp://Server@127.0.0.1:8088
    20         serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/Miao~miao")
    21     }
    22     // mailbox ->receive
    23     override def receive: Receive = { // shit
    24         case "start" => println("2018天猫精灵为您服务!")
    25         case msg: String => { // shit
    26             serverActorRef ! ClientMessage(msg) // 把客户端输入的内容发送给 服务端(actorRef)--》服务端的mailbox中 -> 服务端的receive
    27         }
    28         case ServerMessage(msg) => println(s"收到服务端消息:$msg")
    29     }
    30 }
    31 
    32 object ClientActor  {
    33     def main(args: Array[String]): Unit = {
    34 
    35         //指定客户端的IP和端口
    36         val host = "127.0.0.1"
    37         val port  = 8089
    38 
    39         //指定服务端的IP和端口
    40         val serverHost = "127.0.0.1"
    41         val serverPort = 8088
    42 
    43         /**
    44           * 使用ConfigFactory的parseString方法解析字符串,指定客户端IP和端口
    45           */
    46         val config = ConfigFactory.parseString(
    47             s"""
    48                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
    49                |akka.remote.netty.tcp.hostname=$host
    50                |akka.remote.netty.tcp.port=$port
    51         """.stripMargin)
    52 
    53         /**
    54           * 将config对象传递给ActorSystem并起名为"Server",为了是创建客户端工厂对象(clientActorSystem)。
    55           */
    56         val clientActorSystem = ActorSystem("client", config)
    57 
    58         // 创建dispatch | mailbox
    59         val clientActorRef = clientActorSystem.actorOf(Props(new ClientActor(serverHost, serverPort.toInt)), "Client")
    60 
    61         clientActorRef ! "start" // 自己给自己发送了一条消息 到自己的mailbox => receive
    62 
    63         /**
    64           * 接受用户的输入信息并传送给服务端
    65           */
    66         while (true) {
    67             Thread.sleep(500)
    68             /**
    69               * StdIn.readLine方法是同步阻塞的
    70               */
    71             val question = StdIn.readLine("请问有什么我可以帮你的吗?>>>")
    72             clientActorRef ! question
    73             if (question.equals("stop")){
    74                 Thread.sleep(500)
    75                 println("程序即将结束")
    76                 System.exit(0)
    77             }
    78         }
    79     }
    80 }

    3>.先执行服务端再执行客户端并输入相应信息测试结果如下:

      客户端运行结果如下:

      服务端运行结果如下:

  • 相关阅读:
    HDU 1423 Greatest Common Increasing Subsequence(LICS入门,只要求出最长数)
    HDU 5455 Fang Fang
    hihoCoder 1234 fractal
    二叉排序树
    最大连续子序列和,最大上升子序列和,最长上升子序列,最长公共子串,最长公共子序列,最长上升公共子序列
    C++ string string string string string string string string string string
    pip安装第三方库
    pip安装时使用国内源,加快下载速度
    pip升级命令
    instanceof -- JS
  • 原文地址:https://www.cnblogs.com/yinzhengjie/p/9376296.html
Copyright © 2011-2022 走看看