zoukankan      html  css  js  c++  java
  • Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用

    [comment]: # Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用

    前言

    Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境中,我们已经部署好了一个Spark的开发环境。
    Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用中,我们已经写好了一个Spark的应用。
    本文的目标是写一个基于akka的scala工程,在一个spark standalone的集群环境中运行。

    akka是什么?

    akka的作用

    akka的名字是action kernel的回文。根据官方定义:akka用于resilient elastic distributed real-time transaction processing。
    个人理解是:
    resilient:是指对需求和安全性等方面(来自于外部的)的一种适应力(弹性)。
    elastic:是指对资源利用方面的弹性。
    因此,akka是一个满足需求弹性、资源分配弹性的分布式实时事务处理系统。
    akka只是一个类库,一个工具,并没有提供一个平台。

    akka的运行模式和用例

    • akka有两种运行模式:
      • As a library: 一个使用于web应用,把akka作为一个普通的jar包放到classpath或者WEB-INF/lib
      • As an application: 也称为micro system。
    • akka的用例
      akka的用例很多,可以参照Examples of use-cases for Akka.

    本文中的用例

    在本文中,一个Spark + akka的环境里,akka被用于as an application模式下。
    我们会创建一个akka工程,含有两个应用:

    • akka host application
      建立一个actor system, 定义了所有的任务(actors)。等待客户端的请求。
      部分actor使用了spark的云计算功能。
      这是一个spark的应用。
    • akka client application
      调用host application上特定的actor。

    我们看出,这里我们把akka作为一个任务处理器,并通过spark来完成任务。

    项目结构和文件说明

    说明

    这个工程包含了两个应用。
    一个Consumer应用:CusomerApp:实现了通过Spark的Stream+Kafka的技术来实现处理消息的功能。
    一个Producer应用:ProducerApp:实现了向Kafka集群发消息的功能。

    文件结构

    AkkaSampleApp    # 项目目录
    |-- build.bat    # build文件    
    |-- src
        |-- main
            |-- resources
                |-- application.conf   # Akka Server应用的配置文件
                |-- client.conf        # Akka Client应用的配置文件
            |-- scala
                |-- ClientActor.scala       # Akka Client的Actor:提供了一种调用Server Actor的方式。
                |-- ClientApp.scala         # Akka Client应用
                |-- ProductionReaper.scala  # Akka Shutdown pattern的实现者
                |-- Reaper.scala            # Akka Shutdown pattern的Reaper抽象类
                |-- ServerActor.scala       # Akka Server的Actor,提供一个求1到n的MapReduce计算。使用了Spark。
                |-- ServerApp.scala         # Akka Server应用
    

    构建工程目录

    可以运行:

    mkdir AkkaSampleApp
    mkdir -p /AkkaSampleApp/src/main/resources
    mkdir -p /AkkaSampleApp/src/main/scala
    

    代码

    build.sbt

    name := "akka-sample-app"
     
    version := "1.0"
     
    scalaVersion := "2.11.8"
    
    scalacOptions += "-feature"
    scalacOptions += "-deprecation"
    scalacOptions += "-language:postfixOps"
     
    libraryDependencies ++= Seq(
      "com.typesafe.akka" %% "akka-actor" % "2.4.10",
      "com.typesafe.akka" %% "akka-remote" % "2.4.10",
      "org.apache.spark" %% "spark-core" % "2.0.0"
    )
    
    resolvers += "Akka Snapshots" at "http://repo.akka.io/snapshots/"
    

    application.conf

    akka {
      #loglevel = "DEBUG"
      actor {
        provider = "akka.remote.RemoteActorRefProvider"
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 2552
        }
        #log-sent-messages = on
        #log-received-messages = on
      }
    }
    

    cient.conf

    akka {
      #loglevel = "DEBUG"
      actor {
        provider = "akka.remote.RemoteActorRefProvider"
      }
      remote {
        enabled-transports = ["akka.remote.netty.tcp"]
        netty.tcp {
          hostname = "127.0.0.1"
          port = 0
        }
        #log-sent-messages = on
        #log-received-messages = on
      }
    }
    

    注:port = 0表示这个端口号会自动生成一个。

    ClientActor.scala

    import akka.actor._
    import akka.event.Logging
    
    class ClientActor(serverPath: String) extends Actor {
      val log = Logging(context.system, this)
      val serverActor = context.actorSelection(serverPath)
    
      def receive = {
        case msg: String =>
            log.info(s"ClientActor received message '$msg'")
            serverActor ! 10000L
      }
    }
    

    ClientApp.scala

    import com.typesafe.config.ConfigFactory
    import akka.actor._
    import akka.remote.RemoteScope
    import akka.util._
    
    import java.util.concurrent.TimeUnit
    
    import scala.concurrent._
    import scala.concurrent.duration._
    
    object ClientApp {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("LocalSystem", ConfigFactory.load("client"))
        
        // get the remote actor via the server actor system's address
        val serverAddress = AddressFromURIString("akka.tcp://ServerActorSystem@127.0.0.1:2552")
        val actor = system.actorOf(Props[ServerActor].withDeploy(Deploy(scope = RemoteScope(serverAddress))))
    
        // invoke the remote actor via a client actor.
        // val remotePath = "akka.tcp://ServerActorSystem@127.0.0.1:2552/user/serverActor"
        // val actor = system.actorOf(Props(classOf[ClientActor], remotePath), "clientActor")
    
        buildReaper(system, actor)
    
        // tell
        actor ! 10000L
        
        waitShutdown(system, actor)
      }
    
      private def buildReaper(system: ActorSystem, actor: ActorRef): Unit = {
        import Reaper._
        val reaper = system.actorOf(Props(classOf[ProductionReaper]))
        
        // Watch the action
        reaper ! WatchMe(actor)
      }
    
      private def waitShutdown(system: ActorSystem, actor: ActorRef): Unit = {
        // trigger the shutdown operation in ProductionReaper
        system.stop(actor)
        
        // wait to shutdown
        Await.result(system.whenTerminated, 60.seconds)
      }
    }
    

    ProductionReaper.scala

    当所有的Actor停止后,终止Actor System。

    class ProductionReaper extends Reaper {
      // Shutdown
      def allSoulsReaped(): Unit = {
        context.system.terminate()
      }
    }
    

    Reaper.scala

    import akka.actor.{Actor, ActorRef, Terminated}
    import scala.collection.mutable.ArrayBuffer
    
    object Reaper {
      // Used by others to register an Actor for watching
      case class WatchMe(ref: ActorRef)
    }
    
    abstract class Reaper extends Actor {
      import Reaper._
    
      // Keep track of what we're watching
      val watched = ArrayBuffer.empty[ActorRef]
    
      // Derivations need to implement this method.  It's the
      // hook that's called when everything's dead
      def allSoulsReaped(): Unit
    
      // Watch and check for termination
      final def receive = {
        case WatchMe(ref) =>
          context.watch(ref)
          watched += ref
        case Terminated(ref) =>
          watched -= ref
          if (watched.isEmpty) allSoulsReaped()
      }
    }
    

    ServerActor.scala

    提供一个求1到n平方和的MapReduce计算。

    import akka.actor.Actor
    import akka.actor.Props
    import akka.event.Logging
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkContext._
    import org.apache.spark.SparkConf
    
    class ServerActor extends Actor {
      val log = Logging(context.system, this)
    
      def receive = {
        case n: Long =>
            squareSum(n)
      }
    
      private def squareSum(n: Long): Long = {
        val conf = new SparkConf().setAppName("Simple Application")
        val sc = new SparkContext(conf)
    
        val squareSum = sc.parallelize(1L until n).map { i => 
          i * i
        }.reduce(_ + _)
    
        log.info(s"============== The square sum of $n is $squareSum. ==============")
    
        squareSum
      }
    }
    

    ServerApp.scala

    import scala.concurrent.duration._
    import com.typesafe.config.ConfigFactory
    import akka.actor.ActorSystem
    import akka.actor.Props
    
    object ServerApp {
      def main(args: Array[String]): Unit = {
        val system = ActorSystem("ServerActorSystem")
        val actor = system.actorOf(Props[ServerActor], name = "serverActor")
      }
    }
    

    构建工程

    进入目录AkkaSampleApp。运行:

    sbt package
    

    第一次运行时间会比较长。

    测试应用

    启动Spark服务

    • 启动spark集群master server
    $SPARK_HOME/sbin/start-master.sh
    

    master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

    • 启动spark集群slave server
    $SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077
    

    启动Akka Server应用

    运行:

    $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ServerApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
    

    如果出现java.lang.NoClassDefFoundError错误,
    请参照Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境
    确保akka的包在Spark中设置好了。
    注:可以使用Ctrl+C来中断这个Server应用。

    启动Akka Client应用

    新启动一个终端,运行:

    java -classpath ./target/scala-2.11/akka-sample-app_2.11-1.0.jar:$AKKA_HOME/lib/akka/*:$SCALA_HOME/lib/* ClientApp
    # or
    # $SPARK_HOME/bin/spark-submit --master spark://$(hostname):7077 --class ClientApp target/scala-2.11/akka-sample-app_2.11-1.0.jar
    

    然后:看看Server应用是否开始处理了。

    总结

    Server应用需要Spark的技术,因此,是在Spark环境中运行。
    Clinet应用,可以是一个普通的Java应用。

    下面请看

    至此,我们已经写好了一个spark集群+akka+scala的应用。下一步请看:
    Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

    参照

  • 相关阅读:
    java(一) 基础部分
    Spring使用Autowiring自动装配 解决提示报错小技巧
    idea 中dao层自动生成接口
    从git远程仓库Checkout项目到本地
    idea 新建项目上传至git(coding)
    使用Travis CI给hexo部署做持续集成
    LeetCode395-至少有 K 个重复字符的最长子串
    Java注解
    Java反射
    Java反射应用实例
  • 原文地址:https://www.cnblogs.com/steven-yang/p/5926726.html
Copyright © 2011-2022 走看看