zoukankan      html  css  js  c++  java
  • 【Scala】利用akka实现Spark启动通信


    思路分析

    1.首先启动master,然后依次启动worker
    2.启动worker时,向master发送注册信息(使用case class封装注册信息——workerID,memory,cores)
    3.接收注册信息,保存注册的worker信息,返回注册成功的消息
    4.worker需要定时向master发送心跳信息,这么做的目的是报活
    5.master需要定时进行心跳超时检测,剔除心跳超时的worker


    步骤

    一、创建maven工程,导包

    <properties>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.11.8</scala.version>
        <scala.compat.version>2.11</scala.compat.version>
    </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
    
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>
    
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>2.3.14</version>
        </dependency>
    
    </dependencies>
    
    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <!-- 限制jdk的编译版本插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>
    
    
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
    
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                    <resource>reference.conf</resource>
                                </transformer>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass></mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    二、master进程代码开发

    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    import scala.collection.mutable
    import scala.collection.mutable.ArrayBuffer
    
    import scala.concurrent.duration._
    
    //todo 利用akka实现spark通信---master端
    class Master extends Actor {
    
      //定义数据结构map 保存worker注册信息  k:workerId  v:workerInfo
      private val workerInfoMap = new mutable.HashMap[String, WorkerInfo]()
    
      //定义数据结构保存worker信息,便于后续业务根据worker资源排序 (可选项)
      private val workerInfoArray = new ArrayBuffer[WorkerInfo]()
    
      override def preStart(): Unit = {
        //在初始化中完成首次心跳超时检查,后续间隔指定的时间检查
        //todo 记得导包
        import context.dispatcher
        context.system.scheduler.schedule(0 millis,12000 millis,self,CheckTimeOut)
      }
      //用receive方法持续不断接收处理akka actor的消息
      override def receive: Receive = {
        //用于接收注册信息
        case RegisterMessage(workerId,memory,cores) => {
          //判断worker是否注册
          if (!workerInfoMap.contains(workerId)){
            //如果未注册
            val workerInfo = new WorkerInfo(workerId, memory, cores)
            workerInfoMap.put(workerId,workerInfo)
            workerInfoArray += workerInfo
    
            //注册成功 返回注册成功信息
            sender ! RegisterSuccess(s"$workerId,congratulations!You successfully login!")
          }
        }
          //用于接收worker心跳信息
        case HeartBeatMessage(workerId) =>{
          //判断workerId是否注册,如果注册 更新上次心跳时间
            if (workerInfoMap.contains(workerId)){
              //把当前系统的时间更新为worker上次心跳的时间
              val nowTime: Long = System.currentTimeMillis()
              val info: WorkerInfo = workerInfoMap(workerId)
              info.lastHeartBeatTime = nowTime
            }
        }
          //用于心跳超时检查
        case CheckTimeOut =>
          //如何判断心跳超时?
          //当前时间 - 上次心跳时间(10s) > 10   如果没有网络波动应该是这样
          //如果要考虑网络波动,则可以   当前时间 - 上次心跳时间(10s) > 10
          val outTimeWorker: ArrayBuffer[WorkerInfo] = workerInfoArray.filter(x => System.currentTimeMillis() - x.lastHeartBeatTime > 12000)
          //遍历超时worker并剔除
          for (w <- outTimeWorker){
            val workerId = w.workerId
            workerInfoMap.remove(workerId)
            workerInfoArray -= w
            //打印超时的workId
            println(s"$workerId 已超时")
          }
          //打印当前存活的workerId数
          println(s"当前存活的worker个数是 ${workerInfoArray.size}")
          //worker排序
          println(workerInfoArray.sortBy(x => x.memory).reverse)
      }
    }
    
    object Master {
      def main(args: Array[String]): Unit = {
        //master host
        val host = args(0)
        //master port
        val port = args(1)
    
        //创建config需要解析的字符串
        val configStr: String =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
             |""".stripMargin
    
        //创建ActorSystem需要的config
        val config: Config = ConfigFactory.parseString(configStr)
    
        //创建ActorSystem
        val masterActorSystem: ActorSystem = ActorSystem.create("masterActorSystem", config)
        //创建masterActor
        val master: ActorRef = masterActorSystem.actorOf(Props(new Master), "masterActor")
      }
    }
    

    三、worker进程代码开发

    import java.util.UUID
    
    import akka.actor.{Actor, ActorRef, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.{Config, ConfigFactory}
    
    import scala.concurrent.duration._
    
    //todo 利用akka实现spark通信---worker端
    class Worker(memory: Int,cores: Int) extends Actor {
    
      var master: ActorSelection = _
    
      //workerID
      val workerId = UUID.randomUUID().toString
    
      override def preStart(): Unit = {
    
    
        //引用master (协议、masterActorSystem、ip、端口号、actorMaster、actor层级)
        master = context.actorSelection("akka.tcp://masterActorSystem@192.168.0.108:12323/user/masterActor")
        //给master发送注册信息 使用case class封装
        master ! RegisterMessage(workerId,memory,cores)
      }
    
      override def receive: Receive = {
            //接收master返回的注册成功信息
        case RegisterSuccess(msg) => {
          println(msg)
          //注册成功,立即开始首次心跳,以后间隔指定的时间进行心跳
          //需要四个参数,心跳开始时间、心跳间隔时间、发给谁、发送信息
          //定时的消息只能发送给自己
          //todo 记得导包
          import context.dispatcher
          context.system.scheduler.schedule(0 millis,10000 millis,self,HeartBeat)
        }
          //接收心跳提醒,完成真正的心跳动作
        case HeartBeat => {
          master ! HeartBeatMessage(workerId)
        }
      }
    }
    
    object Worker{
      def main(args: Array[String]): Unit = {
        //worker host
        val host = args(0)
        //worker port
        val port = args(1)
    
        //worker memory
        val memory = args(2).toInt
        //worker cores
        val cores = args(3).toInt
    
        //创建config要解析的字符串
        val configStr: String =
          s"""
             |akka.actor.provider = "akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = "$host"
             |akka.remote.netty.tcp.port = "$port"
             |
             |""".stripMargin
    
        //创建ActorSystem需要的configuration
        val config: Config = ConfigFactory.parseString(configStr)
        //创建ActorSystem
        val workerActorSystem: ActorSystem = ActorSystem.create("workerActorSystem", config)
        //创建workerActor
        val worker: ActorRef = workerActorSystem.actorOf(Props(new Worker(memory,cores)), "workerActor")
      }
    }
    
  • 相关阅读:
    DFS的联通性问题
    Stl-unordered_map 无序关联式容器的基本用法(xmind)
    【图论】匈牙利算法——社会人数规模专家
    AcWing 860. 染色法判定二分图
    AcWing 1227. 分巧克力(二分)
    【图论】【最小生成树】prim【AcWing】局域网&&繁忙的都市
    【图论】拓扑排序
    Stl—bitset用法
    vector< vector<int> > 的初始化
    Floyd——人人都是中间商(50%)
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772395.html
Copyright © 2011-2022 走看看