zoukankan      html  css  js  c++  java
  • 大数据学习之并发编程模型AKKA 43

    Spark使用底层通信框架AKKA

    分布式

    master

    Worker

    hadoop使用的是rpc

     

    1)akka简介

    写并发程序很难,AKKA解决spark这个问题。

    akka构建在JVM平台上,是一种高并发、分布式、并且容错的应用工具包

    akkascala语言编写同时提供了scalajava的开发接口

    akka可以开发一些高并发程序。

    1)Akka的Actor模型

    akka处理并发的方法基于actor模型

    在基于actor的系统中,所有事物都是actor

    actor作为一个并发模型设计和架构的,面向对象不是。

    actoractor之间只能通过消息通信。

    Akka特点:

    1对并发模型进行了更高的抽象

    2)异步、非阻塞、高性能的事件驱动编程模型

    3)轻量级事件处理(1G内存可以容纳百万级别的Actor

    同步:阻塞(发消息 一直等待消息)

    异步:不阻塞(发消息 不等待 该干嘛干嘛)

    actor简化了并发编程,提高了程序性能。

    3:AKKA编程

    Actor工作机制

    1)需求

    我发消息,自己收

    Step1:创建一个Maven项目

    Pom文件如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.dawn</groupId>
        <artifactId>akka</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <!-- 定义版本常量 -->
        <properties>
            <encoding>UTF-8</encoding>
            <scala.version>2.11.8</scala.version>
            <scala.compat.version>2.11</scala.compat.version>
            <akka.version>2.4.17</akka.version>
        </properties>
    
        <dependencies>
            <!-- 添加scala包的依赖 -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <!-- 添加akka包的actor依赖 -->
            <dependency>
                <groupId>com.typesafe.akka</groupId>
                <artifactId>akka-actor_${scala.compat.version}</artifactId>
                <version>${akka.version}</version>
            </dependency>
    
            <!-- 多进程之间的Actor通信设置 -->
            <dependency>
                <groupId>com.typesafe.akka</groupId>
                <artifactId>akka-remote_${scala.compat.version}</artifactId>
                <version>${akka.version}</version>
            </dependency>
        </dependencies>
    
        <!-- 指定使用插件-->
        <build>
            <!-- 指定源码包和测试包的位置信息 -->
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <!-- 指定编译scala的插件 -->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <!-- maven打包使用的插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                        <resource>reference.conf</resource>
                                    </transformer>
                                    <!-- 指定main方法 -->
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>com.itstaredu.spark.SparkWorker</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    </project>

    Step2:编写CallMe

    import akka.actor.{Actor, ActorSystem, Props}
    
    /**
      * @author Dawn
      * 2019年6月16日20:49:22
      * @version 1.0
      * Akka的Actor模型 自己发给自己
      */
    object CallMe {
      //1.创建ActorSystem 用ActorSystem创建Actor
      private val acFactory = ActorSystem("AcFactory")
      //2.Actor发送消息通过ActorRef
      private val callRef = acFactory.actorOf(Props[CallMe],"CallMe")
    
      def main(args: Array[String]): Unit = {
        callRef ! "dawn"
        callRef !"King"
        callRef ! "stop"
      }
    
    }
    
    class CallMe extends Actor{
      //Receive用户接受消息并且处理消息
      override def receive: Receive = {
        case "dawn" => println("你好啊!!")
        case "King" => println("are you King?")
        case "stop" => {
          //关闭代理ActorRef
          context.stop(self)
          //关闭ActorSystem
          context.system.terminate()
        }
      }
    }
    

      

    运行结果:

    2需求

    一个Actor发送消息,另外一个Actor接收消息

    Step1:编写一个DawnActorYaYaActor

    DawnActor:

    import akka.actor.Actor
    /**
      * @author Dawn
      * 2019年6月16日21:10:40
      * @version 1.0
      * DawnActor
      */
    class DawnActor extends Actor{
      override def receive: Receive = {
        case "hello i am YaYa" => println("hi i am dawn")
        case "you are handsome" => println("you are beautiful")
        case "你好我是dawn" => println("哈哈哈哈")
      }
    
    }
    

      

    YaYaActor:

    import akka.actor.{Actor, ActorRef}
    
    /**
      * @author Dawn
      * 2019年6月16日21:10:14
      * @version 1.0
      * YaYaActor
      */
    class YaYaActor(val d:ActorRef) extends Actor{
      override def receive: Receive = {
        case "你好 我是YaYa" => {
          //YaYa给Dawn发消息
          d ! "你好我是dawn"
        }
      }
    }
    

      

    Step2:编写一个驱动类

    import akka.actor.{ ActorSystem, Props}
    
    /**
      * @author Dawn
      * 2019年6月16日21:16:23
      * @version 1.0
      * 创建一个驱动类
      */
    object QqDriver {
      //1.创建ActorSystem 用ActorSystem创建Actor
      private val qqFactory = ActorSystem("QqFactory")
      //2.Actor发送消息通过ActorRef
      private val dawnRef = qqFactory.actorOf(Props[DawnActor],"Dawn")
      //YaYa需要接收Dawn发送的消息
      private val yaYaRef = qqFactory.actorOf(Props(new YaYaActor(dawnRef)),"YaYa")
    
      def main(args: Array[String]): Unit = {
        //1.dawn 给自己发消息 根第一个那个CallMe一样的
    //    dawnRef ! "you are handsome"
        //2.YaYa 给Dawn发消息
        yaYaRef ! "你好 我是YaYa"
      }
    }
    

      

    运行结果:

  • 相关阅读:
    mybatis 二级缓存
    前端学习记
    消息队列高手课 笔记11
    cache业务
    这个前端课程主要讲mui框架
    spring cloud stream
    最近学习freemarker
    说点什么
    即将进入Windows 11时代,DevExpress控件将会有哪些改变呢?
    UI开发框架Kendo React R3 2021更新亮点——新的 React 组件
  • 原文地址:https://www.cnblogs.com/hidamowang/p/11051408.html
Copyright © 2011-2022 走看看