zoukankan      html  css  js  c++  java
  • Scala进阶之路-Spark底层通信小案例

               Scala进阶之路-Spark底层通信小案例

                                作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

    一.Spark Master和worker通信过程简介

      1>.Worker会向master注册自己;

      2>.Master收到worker的注册信息之后,会告诉你已经注册成功,并给worker发送启动执行器的消息;

      3>.Worker收到master的注册消息之后,会定期向master汇报自己的状态;

      4>.master收到worker的心跳信息后,定期的更新worker的状态,因为worker在发送心跳的时候会携带心跳发送的时间,master会监测master发送过来的心跳信时间和当前时间的差,如果大于5分钟,master会监测发送过来的心跳时间和当前时间的差,如果大于5分钟,则认为worker已死。然后master在分配任务的时候就不会给worker下发任务!

      关于Master和Worker之间的通信机制,我们可以用以下一张图介绍:

    二.编写源代码

    1>.Maven依赖

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6 
     7     <groupId>cn.org.yinzhengjie</groupId>
     8     <artifactId>MyActor</artifactId>
     9     <version>1.0-SNAPSHOT</version>
    10 
    11     <!-- 定义一下常量 -->
    12     <properties>
    13         <encoding>UTF-8</encoding>
    14         <scala.version>2.11.8</scala.version>
    15         <scala.compat.version>2.11</scala.compat.version>
    16         <akka.version>2.4.17</akka.version>
    17     </properties>
    18 
    19     <dependencies>
    20         <!-- 添加scala的依赖 -->
    21         <dependency>
    22             <groupId>org.scala-lang</groupId>
    23             <artifactId>scala-library</artifactId>
    24             <version>${scala.version}</version>
    25         </dependency>
    26 
    27         <!-- 添加akka的actor依赖 -->
    28         <dependency>
    29             <groupId>com.typesafe.akka</groupId>
    30             <artifactId>akka-actor_${scala.compat.version}</artifactId>
    31             <version>${akka.version}</version>
    32         </dependency>
    33 
    34         <!-- 多进程之间的Actor通信 -->
    35         <dependency>
    36             <groupId>com.typesafe.akka</groupId>
    37             <artifactId>akka-remote_${scala.compat.version}</artifactId>
    38             <version>${akka.version}</version>
    39         </dependency>
    40     </dependencies>
    41 
    42     <!-- 指定插件-->
    43     <build>
    44         <!-- 指定源码包和测试包的位置 -->
    45         <sourceDirectory>src/main/scala</sourceDirectory>
    46         <testSourceDirectory>src/test/scala</testSourceDirectory>
    47     </build>
    48 </project>

    2>.MessageProtocol源代码

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.spark
     7 
     8 /**
     9   * worker -> master ,即worker向master发送消息
    10   */
    11 case class RegisterWorkerInfo(id: String, core: Int, ram: Int)          // worker向master注册自己(信息)
    12 case class HearBeat(id: String)                                        // worker给master发送心跳信息
    13 
    14 /**
    15   * master -> worker,即master向worker发送消息
    16   */
    17 case object RegisteredWorkerInfo                    // master向worker发送注册成功消息
    18 case object SendHeartBeat                           // worker 发送发送给自己的消息,告诉自己说要开始周期性的向master发送心跳消息
    19 case object CheckTimeOutWorker                      //master自己给自己发送一个检查超时worker的信息,并启动一个调度器,周期新检测删除超时worker
    20 case object RemoveTimeOutWorker                     // master发送给自己的消息,删除超时的worker
    21 
    22 /**
    23   * 定义存储worker信息的类
    24   * @param id          :    每个worker的id是不变的且唯一的!
    25   * @param core        :    机器的核数
    26   * @param ram         :    内存大小
    27   */
    28 case class WorkerInfo(val id: String, core: Int, ram: Int) {
    29     //定义最后一次的心跳时间,初始值为null。
    30     var lastHeartBeatTime: Long = _
    31 }

    3>.SparkWorker源代码

     1 /*
     2 @author :yinzhengjie
     3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
     4 EMAIL:y1053419035@qq.com
     5 */
     6 package cn.org.yinzhengjie.spark
     7 
     8 import java.util.UUID
     9 
    10 import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    11 import com.typesafe.config.ConfigFactory
    12 import scala.concurrent.duration._          // 导入时间单位
    13 
    14 /**
    15   * 定义主构造器,用于指定master的地址
    16   */
    17 class SparkWorker(masterUrl: String) extends Actor{
    18     var masterProxy:ActorSelection = _                  //定义master的引用对象(actorRef)
    19     val workId:String = UUID.randomUUID().toString      //定义worker的uuid,每个worker的id是不变的且唯一的!
    20     /**
    21       * 通过preStart方法拿到master的引用对象(actorRef),我们重写该方法就会在receive方法执行之前执行!也就是拿到master对象只需要拿一次。
    22       */
    23     override def preStart(): Unit = {
    24         masterProxy = context.actorSelection(masterUrl)
    25     }
    26     override def receive: Receive = {
    27         case "started" => { // 自己已就绪
    28             // 向master注册自己的信息,id, core, ram
    29             masterProxy ! RegisterWorkerInfo(workId, 4, 32 * 1024) // 此时master会收到该条信息
    30         }
    31 
    32         /**
    33           * 处理master发送给自己的注册成功消息
    34           */
    35         case RegisteredWorkerInfo => {
    36             import context.dispatcher // 使用调度器时候必须导入dispatcher,因为该包涉及到隐式转换的东西。
    37             /**
    38               *         worker通过"context.system.scheduler.schedule"启动一个定时器,定时向master 发送心跳信息,需要指定
    39               * 四个参数:
    40               *         第一个参数是需要指定延时时间,此处指定的间隔时间为0毫秒;
    41               *         第二个参数是间隔时间,即指定定时器的周期性执行时间,我们这里指定为1秒;
    42               *         第三个参数是发送消息给谁,我们这里指定发送消息给自己,使用变量self即可;
    43               *         第四个参数是指发送消息的具体内容;
    44               *         注意:由于我们将消息周期性的发送给自己,因此我们自己需要接受消息并处理,也就是需要定义下面的SendHeartBeat
    45               */
    46             context.system.scheduler.schedule(0 millis, 1000 millis, self, SendHeartBeat)
    47         }
    48         case SendHeartBeat => {
    49             // 开始向master发送心跳了
    50             println(s"------- $workId 发送心跳 -------")
    51             masterProxy ! HearBeat(workId) // 此时master将会收到心跳信息
    52         }
    53     }
    54 }
    55 
    56 
    57 object SparkWorker {
    58     def main(args: Array[String]): Unit = {
    59         // 检验参数
    60         if(args.length != 4) {
    61             println(
    62                 """
    63                   |请输入参数:<host> <port> <workName> <masterURL>
    64                 """.stripMargin)
    65             sys.exit() // 退出程序
    66         }
    67         /**
    68           * 定义参数,主机,端口号,worker名称以及master的URL。
    69           */
    70         val host = args(0)
    71         val port = args(1)
    72         val workName = args(2)
    73         val masterURL = args(3)
    74         /**
    75           * 我们使用ConfigFactory.parseString来创建读取参数配置的对象config
    76           */
    77         val config = ConfigFactory.parseString(
    78             s"""
    79               |akka.actor.provider="akka.remote.RemoteActorRefProvider"
    80               |akka.remote.netty.tcp.hostname=$host
    81               |akka.remote.netty.tcp.port=$port
    82             """.stripMargin)
    83         val actorSystem = ActorSystem("sparkWorker", config)
    84         /**
    85           * 创建worker的actorRef
    86           */
    87         val workerActorRef = actorSystem.actorOf(Props(new SparkWorker(masterURL)), workName)
    88         workerActorRef ! "started"      //给自己发送一个以启动的消息,表示自己已经就绪了
    89     }
    90 }

    4>.SparkMaster源代码

      1 /*
      2 @author :yinzhengjie
      3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Scala%E8%BF%9B%E9%98%B6%E4%B9%8B%E8%B7%AF/
      4 EMAIL:y1053419035@qq.com
      5 */
      6 package cn.org.yinzhengjie.spark
      7 
      8 import akka.actor.{Actor, ActorSystem, Props}
      9 import com.typesafe.config.ConfigFactory
     10 import scala.concurrent.duration._
     11 
     12 class SparkMaster extends Actor{
     13 
     14     // 定义存储worker的信息的saveWorkerInfo对象
     15     val saveWorkerInfo = collection.mutable.HashMap[String, WorkerInfo]()
     16 
     17 //    override def preStart(): Unit = {
     18 //        context.system.scheduler.schedule(0 millis, 6000 millis, self, RemoveTimeOutWorker)
     19 //    }
     20 
     21     override def receive: Receive = {
     22         /**
     23           * 处理收到worker注册过来的信息
     24           */
     25         case RegisterWorkerInfo(wkId, core, ram) => {
     26             /**
     27               * 存储之前需要判断之前是否已经存储过了,如果没有存储就以wkId为key将worker的信息存储起来,存储到HashMap,
     28               */
     29             if (!saveWorkerInfo.contains(wkId)) {
     30                 val workerInfo = new WorkerInfo(wkId, core, ram)
     31                 saveWorkerInfo += ((wkId, workerInfo))
     32                 /**
     33                   * master存储完worker注册的数据之后,要告诉worker说你已经注册成功
     34                   */
     35                 sender() ! RegisteredWorkerInfo // 此时worker会收到注册成功消息
     36             }
     37         }
     38         /**
     39           *  master收到worker的心跳消息之后,更新woker的上一次心跳时间
     40           */
     41         case HearBeat(wkId) => {
     42             val workerInfo = saveWorkerInfo(wkId)
     43             val currentTime = System.currentTimeMillis()
     44             workerInfo.lastHeartBeatTime = currentTime          // 更改心跳时间
     45         }
     46         case CheckTimeOutWorker => {
     47             import context.dispatcher // 使用调度器时候必须导入dispatcher,因为该包涉及到隐式转换的东西。
     48             context.system.scheduler.schedule(0 millis, 5000 millis, self, RemoveTimeOutWorker)
     49         }
     50         case RemoveTimeOutWorker => {
     51             /**
     52               *         将hashMap中的所有的value都拿出来,然后查看当前时间和上一次心跳时间差是否超过三次心跳时间,
     53               * 即三次没有发送心跳信息就认为超时,每次心跳时间默认为1000毫秒,三次则为3000毫秒
     54               */
     55             val workerInfos = saveWorkerInfo.values
     56             val currentTime = System.currentTimeMillis()
     57 
     58 
     59             workerInfos
     60               .filter(workerInfo => currentTime - workerInfo.lastHeartBeatTime > 3000)        //过滤超时的worker
     61               .foreach(workerInfo => saveWorkerInfo.remove(workerInfo.id))                    //将过滤超时的worker删除掉
     62             println(s"====== 还剩 ${saveWorkerInfo.size} 存活的Worker ======")
     63         }
     64     }
     65 }
     66 
     67 object SparkMaster {
     68     private var name = ""
     69     private val age = 100
     70     def main(args: Array[String]): Unit = {
     71         // 检验参数
     72         if(args.length != 3) {
     73             println(
     74                 """
     75                   |请输入参数:<host> <port> <masterName>
     76                 """.stripMargin)
     77             sys.exit() // 退出程序
     78         }
     79         /**
     80           * 定义参数,主机,端口号,master名称
     81           */
     82         val host = args(0)
     83         val port = args(1)
     84         val masterName = args(2)
     85         /**
     86           * 我们使用ConfigFactory.parseString来创建读取参数配置的对象config
     87           */
     88         val config = ConfigFactory.parseString(
     89             s"""
     90                |akka.actor.provider="akka.remote.RemoteActorRefProvider"
     91                |akka.remote.netty.tcp.hostname=$host
     92                |akka.remote.netty.tcp.port=$port
     93             """.stripMargin)
     94 
     95         val actorSystem = ActorSystem("sparkMaster", config)
     96         val masterActorRef = actorSystem.actorOf(Props[SparkMaster], masterName)
     97         /**
     98           * 自己给自己发送一个消息,去启动一个调度器,定期的检测HashMap中超时的worker
     99           */
    100         masterActorRef ! CheckTimeOutWorker
    101     }
    102 }

    三.本机测试

    1>.启动master端

    配置参数如下:
    127.0.0.1 8888 master

    2>.启动woker端

    两个worker的配置参数如下:
    127.0.0.1 6665 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master
    127.0.0.1 6666 worker akka.tcp://sparkMaster@127.0.0.1:8888//user/master

      服务端输出如下:

    四.master worker打包部署到linux多台服务测试

    1>.打包SparkMaster

      第一步:修改Maven配置如下:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6     <groupId>cn.org.yinzhengjie</groupId>
     7     <artifactId>MyActor</artifactId>
     8     <version>1.0-SNAPSHOT</version>
     9     <!-- 定义一下常量 -->
    10     <properties>
    11         <encoding>UTF-8</encoding>
    12         <scala.version>2.11.8</scala.version>
    13         <scala.compat.version>2.11</scala.compat.version>
    14         <akka.version>2.4.17</akka.version>
    15     </properties>
    16     <dependencies>
    17         <!-- 添加scala的依赖 -->
    18         <dependency>
    19             <groupId>org.scala-lang</groupId>
    20             <artifactId>scala-library</artifactId>
    21             <version>${scala.version}</version>
    22         </dependency>
    23         <!-- 添加akka的actor依赖 -->
    24         <dependency>
    25             <groupId>com.typesafe.akka</groupId>
    26             <artifactId>akka-actor_${scala.compat.version}</artifactId>
    27             <version>${akka.version}</version>
    28         </dependency>
    29         <!-- 多进程之间的Actor通信 -->
    30         <dependency>
    31             <groupId>com.typesafe.akka</groupId>
    32             <artifactId>akka-remote_${scala.compat.version}</artifactId>
    33             <version>${akka.version}</version>
    34         </dependency>
    35     </dependencies>
    36     <!-- 指定插件-->
    37     <build>
    38         <!-- 指定源码包和测试包的位置 -->
    39         <sourceDirectory>src/main/scala</sourceDirectory>
    40         <testSourceDirectory>src/test/scala</testSourceDirectory>
    41         <plugins>
    42             <!-- 指定编译scala的插件 -->
    43             <plugin>
    44                 <groupId>net.alchim31.maven</groupId>
    45                 <artifactId>scala-maven-plugin</artifactId>
    46                 <version>3.2.2</version>
    47                 <executions>
    48                     <execution>
    49                         <goals>
    50                             <goal>compile</goal>
    51                             <goal>testCompile</goal>
    52                         </goals>
    53                         <configuration>
    54                             <args>
    55                                 <arg>-dependencyfile</arg>
    56                                 <arg>${project.build.directory}/.scala_dependencies</arg>
    57                             </args>
    58                         </configuration>
    59                     </execution>
    60                 </executions>
    61             </plugin>
    62             <!-- maven打包的插件 -->
    63             <plugin>
    64                 <groupId>org.apache.maven.plugins</groupId>
    65                 <artifactId>maven-shade-plugin</artifactId>
    66                 <version>2.4.3</version>
    67                 <executions>
    68                     <execution>
    69                         <phase>package</phase>
    70                         <goals>
    71                             <goal>shade</goal>
    72                         </goals>
    73                         <configuration>
    74                             <filters>
    75                                 <filter>
    76                                     <artifact>*:*</artifact>
    77                                     <excludes>
    78                                         <exclude>META-INF/*.SF</exclude>
    79                                         <exclude>META-INF/*.DSA</exclude>
    80                                         <exclude>META-INF/*.RSA</exclude>
    81                                     </excludes>
    82                                 </filter>
    83                             </filters>
    84                             <transformers>
    85                                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    86                                     <resource>reference.conf</resource>
    87                                 </transformer>
    88                                 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkMaster -->
    89                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    90                                     <mainClass>cn.org.yinzhengjie.spark.SparkMaster</mainClass>
    91                                 </transformer>
    92                             </transformers>
    93                         </configuration>
    94                     </execution>
    95                 </executions>
    96             </plugin>
    97         </plugins>
    98     </build>
    99 </project>
    指定main方法:cn.org.yinzhengjie.spark.SparkMaster

      第二步:点击package开始打包:

      第三步:查看依赖包内部结构:

     

    2>.打包SparkWorker

      第一步:修改Maven配置如下:

     1 <?xml version="1.0" encoding="UTF-8"?>
     2 <project xmlns="http://maven.apache.org/POM/4.0.0"
     3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     5     <modelVersion>4.0.0</modelVersion>
     6     <groupId>cn.org.yinzhengjie</groupId>
     7     <artifactId>MyActor</artifactId>
     8     <version>1.0-SNAPSHOT</version>
     9     <!-- 定义一下常量 -->
    10     <properties>
    11         <encoding>UTF-8</encoding>
    12         <scala.version>2.11.8</scala.version>
    13         <scala.compat.version>2.11</scala.compat.version>
    14         <akka.version>2.4.17</akka.version>
    15     </properties>
    16     <dependencies>
    17         <!-- 添加scala的依赖 -->
    18         <dependency>
    19             <groupId>org.scala-lang</groupId>
    20             <artifactId>scala-library</artifactId>
    21             <version>${scala.version}</version>
    22         </dependency>
    23         <!-- 添加akka的actor依赖 -->
    24         <dependency>
    25             <groupId>com.typesafe.akka</groupId>
    26             <artifactId>akka-actor_${scala.compat.version}</artifactId>
    27             <version>${akka.version}</version>
    28         </dependency>
    29         <!-- 多进程之间的Actor通信 -->
    30         <dependency>
    31             <groupId>com.typesafe.akka</groupId>
    32             <artifactId>akka-remote_${scala.compat.version}</artifactId>
    33             <version>${akka.version}</version>
    34         </dependency>
    35     </dependencies>
    36     <!-- 指定插件-->
    37     <build>
    38         <!-- 指定源码包和测试包的位置 -->
    39         <sourceDirectory>src/main/scala</sourceDirectory>
    40         <testSourceDirectory>src/test/scala</testSourceDirectory>
    41         <plugins>
    42             <!-- 指定编译scala的插件 -->
    43             <plugin>
    44                 <groupId>net.alchim31.maven</groupId>
    45                 <artifactId>scala-maven-plugin</artifactId>
    46                 <version>3.2.2</version>
    47                 <executions>
    48                     <execution>
    49                         <goals>
    50                             <goal>compile</goal>
    51                             <goal>testCompile</goal>
    52                         </goals>
    53                         <configuration>
    54                             <args>
    55                                 <arg>-dependencyfile</arg>
    56                                 <arg>${project.build.directory}/.scala_dependencies</arg>
    57                             </args>
    58                         </configuration>
    59                     </execution>
    60                 </executions>
    61             </plugin>
    62             <!-- maven打包的插件 -->
    63             <plugin>
    64                 <groupId>org.apache.maven.plugins</groupId>
    65                 <artifactId>maven-shade-plugin</artifactId>
    66                 <version>2.4.3</version>
    67                 <executions>
    68                     <execution>
    69                         <phase>package</phase>
    70                         <goals>
    71                             <goal>shade</goal>
    72                         </goals>
    73                         <configuration>
    74                             <filters>
    75                                 <filter>
    76                                     <artifact>*:*</artifact>
    77                                     <excludes>
    78                                         <exclude>META-INF/*.SF</exclude>
    79                                         <exclude>META-INF/*.DSA</exclude>
    80                                         <exclude>META-INF/*.RSA</exclude>
    81                                     </excludes>
    82                                 </filter>
    83                             </filters>
    84                             <transformers>
    85                                 <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
    86                                     <resource>reference.conf</resource>
    87                                 </transformer>
    88                                 <!-- 指定main方法:cn.org.yinzhengjie.spark.SparkWorker -->
    89                                 <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    90                                     <mainClass>cn.org.yinzhengjie.spark.SparkWorker</mainClass>
    91                                 </transformer>
    92                             </transformers>
    93                         </configuration>
    94                     </execution>
    95                 </executions>
    96             </plugin>
    97         </plugins>
    98     </build>
    99 </project>
    指定main方法:cn.org.yinzhengjie.spark.SparkWorker

      接下来的两步还是和上面的步骤一直,将打包完成后的文件改名并查看主类信息如下:

    3>.开启三台虚拟机并在master节点上传master.jar并运行

    [yinzhengjie@s101 download]$ ll
    total 67320
    -rw-r--r--. 1 yinzhengjie yinzhengjie 20124547 Jul 31 20:42 master.jar
    -rw-r--r--. 1 yinzhengjie yinzhengjie 28678231 Jul 20 21:18 scala-2.11.8.tgz
    -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 21:52 worker.jar
    [yinzhengjie@s101 download]$ 
    [yinzhengjie@s101 download]$ 
    [yinzhengjie@s101 download]$ java -jar master.jar 172.16.30.101 8888 master

     

    4>.将worker.jar包上传到另外的两个节点并运行,如下:

      172.16.30.102节点操作如下:
    [yinzhengjie@s102 download]$ ll
    total 19656
    -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:01 worker.jar
    [yinzhengjie@s102 download]$ 
    [yinzhengjie@s102 download]$ java -jar worker.jar 172.16.30.102 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master
    [yinzhengjie@s102 download]$ 
    
      172.16.30.103节点操作如下:
    [yinzhengjie@s103 download]$ ll
    total 19656
    -rw-r--r--. 1 yinzhengjie yinzhengjie 20124541 Jul 31 22:00 worker.jar
    [yinzhengjie@s103 download]$ 
    [yinzhengjie@s103 download]$ 
    [yinzhengjie@s103 download]$ java -jar worker.jar 172.16.30.103 6665 worker akka.tcp://sparkMaster@172.16.30.101:8888//user/master

      172.16.30.102节点操作如下:

     

      172.16.30.103节点操作如下:

      172.16.30.101输出信息如下:

  • 相关阅读:
    Azkaban3.81.x部署+坑
    MapReduce on Yarn运行原理
    伪分布式Spark + Hive on Spark搭建
    Hadoop版本升级(2.7.6 => 3.1.2)
    Hadoop运行原理总结(详细)
    Scala函数柯里化(Currying or Curry)
    ubuntu安装伪分布式Hadoop3.1.2
    ubuntu16.04安装mysql
    前端每日知识点分享(总结版)
    浅析 CSS 中的边距重叠
  • 原文地址:https://www.cnblogs.com/yinzhengjie/p/9376308.html
Copyright © 2011-2022 走看看