zoukankan      html  css  js  c++  java
  • 初见akka-01

      最近在学习akka,在看rpc相关的东西,有点脑子疼,哈哈

      1.需求:

        目前大多数分布式架构底层通信是通过RPC实现的,RPC框架非常多,

        比如我们学过的Hadoop项目的RPC通信框架,但是Hadoop在设计之初就

        是为了运行长达数小时的批量而设计的,在某些极端的情况下,

        任务提交的延迟很高,所有Hadoop的RPC显得有些笨重。

      2.特点

        Spark 的RPC是通过Akka类库实现的,Akka用Scala语言开发,

        基于Actor并发模型实现,

        Akka具有高可靠、高性能、可扩展等特点,使用Akka可以轻松

        实现分布式RPC功能

      3.Akka简介

        Akka基于Actor模型,提供了一个用于构建可扩展的(Scalable)、

        弹性的(Resilient)、快速响应的(Responsive)应用程序的平台

      4.一句话描述RPC:

        不同进程之间的通信调用叫做RPC,只要有网络通信即可

      5.进程和线程之间的关系

        一个进程包含多个线程,因为启动一个进程就相当于启动了

        一个jvm(虚拟机)

      6.重要的类的描述

        ActorSystem是这个进程中Actor的老大,负责和监控所有的actor,

        我们可以使用这个,ActorSystem创建很多个Actor,通常是

        一个单例对象,Actor负责通信

      7.关于一个简单的akka的小例子,自己给自己发送信息

    package cn.xx.rpc
    
    import akka.actor.{Actor, ActorSystem}
    import akka.actor.Actor.Receive
    import com.typesafe.config.ConfigFactory
    import akka.actor.Props
    
    /**
      * Created by XX on 2016/12/23.
      */
    class Master extends Actor {
    
      println("constructor invoked")
    
      //用于接收消息
      override def receive: Receive = {
        case "connect" => {
          println("a client connected")
        }
        case "hello" =>{
          println("hello")
        }
      }
    
      override def preStart(): Unit = {
         println("prestart invoked")
      }
    }
    
    object Master{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        // 准备配置
        val configStr =
          s"""                                                //这个s要确定,只有这样才能加入变量
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("MasterSystem",config )
        //创建Actor
        val master = actorSystem.actorOf(Props[Master],"Master")
        master ! "hello"
        actorSystem.awaitTermination()
      }
    }

      8.简单的不同通信之间的RPC的通行

      Master.scala

    package cn.wj.rpc
    
    import akka.actor.{Actor, ActorSystem}
    import akka.actor.Actor.Receive
    import com.typesafe.config.ConfigFactory
    import akka.actor.Props
    
    /**
      * Created by WJ on 2016/12/23.
      */
    class Master extends Actor {
    
      println("constructor invoked")
    
      //用于接收消息
      override def receive: Receive = {
        case "connect" => {
          println("a client connected")
          sender ! "reply"  //往发送给他消息的人回复一个消息
        }
        case "hello" =>{
          println("hello")
        }
      }
    
      override def preStart(): Unit = {
         println("prestart invoked")
      }
    }
    
    object Master{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        // 准备配置
        val configStr =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("MasterSystem",config )
        //创建Actor
        val master = actorSystem.actorOf(Props[Master],"Master")
        master ! "hello"
        actorSystem.awaitTermination()
      }
    }
    

      Worker.scala

    package cn.wj.rpc
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    /**
      * Created by WJ on 2016/12/23.
      */
    class Worker(val mastHost:String,val mastPort:Int) extends Actor {
    
      var master : ActorSelection = _
    
      //preStart执行方法的时机:构造器之后,receive之前
      //与Master(Actor)建立连接
      override def preStart(): Unit = {
        //master已经是别的Master的引用了
    //    master = context.actorSelection(s"akka.tcp://MasterSystem@$mastHost:$mastPort/user/Master")
        master = context.actorSelection(s"akka.tcp://MasterSystem@192.168.109.1:8888/user/Master")
        //akka.tcp://MasterSystem@192.168.109.1:8888
        master ! "connect"
      }
    
      override def receive: Receive = {
        case "reply" => {
          println("a reply form master")
        }
      }
    }
    
    object Worker{
      def main(args: Array[String]): Unit = {
        val host = args(0)
        val port = args(1).toInt
        val masterHost = args(2)
        val masterPort = args(3).toInt
        // 准备配置
        val configStr =
        s"""
           |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
           |akka.remote.netty.tcp.hostname = "$host"
           |akka.remote.netty.tcp.port = "$port"
           """.stripMargin
        val config = ConfigFactory.parseString(configStr)
        //ActorSystem老大,辅助创建和监控下面的Actor,他是单例的
        val actorSystem = ActorSystem("WorkerSystem",config )
        //创建Actor,此时调用该(Actor)的prestart以及receive方法
          actorSystem.actorOf(Props(new Worker(masterHost,masterPort)),"Worker")
         actorSystem.awaitTermination()
      }
    }
    

      9.通信业务逻辑

        首先启动Master,然后启动所有的Worker
        1.Worker启动后,在preStart方法中与Master建立连接,

         向Master发送注册,将Worker的信息

         通过case class封装起来发送给Master

        2.Master接受Worker的注册消息后将Worker的消息保存起来
        3.Worker定期向Master发送心跳,为了报活

    何当共剪西窗烛,却话巴山夜雨时
  • 相关阅读:
    Redis 主从复制架构配置及原理
    IPTABLES详解(10):IPTABLES自定义链
    ipset
    Atitit 数据库核心技术index索引技术 btree hash lsm fulltxt目录1.1. HASH
    Atitit 存储引擎核心技术 总结目录1. 表的存储有三个文件:结构+数据+索引 12. 页式管理
    Atitit 为什么oracle这类大型数据库比mysql的性能机制目录1. 分区机制差别 11.1. Join算
    Atitit 存储与数据库性能调优流程目录1. 数据库出现性能瓶颈,对外表现有几个方面:
    ATITIT db perf enhs 数据库性能优化 目录 第一章 Cache类 1 第一节 查询cache 1 第二节 Update cache 2 第三节 内存表机制 零时表 2 第四节 雾
    Atitit 核心技术有哪些一般 目录 第一章 Rest调用交互 2 第二章 2 第三章 Cmd调用交互 2 第四章 2 第五章 爬虫技术 2 第一节 Httpclient 2 第二节 Html
    Atitit保证架构超前性 前瞻性 目录 第一章 为什么需要修改代码 1 第一节 业务增加功能 1 第二节 增加字段 1 第三节 增加表数据需要查询 修改 1 第四节 类库升级 1 第二章 简单抽象
  • 原文地址:https://www.cnblogs.com/wnbahmbb/p/6215802.html
Copyright © 2011-2022 走看看