zoukankan      html  css  js  c++  java
  • ScalaPB(1): using protobuf in akka

        任何类型的实例作为消息在两端独立系统的机器之间进行传递时必须经过序列化/反序列化serialize/deserialize处理过程。假设以下场景:在一个网络里有两台连接的服务器,它们分别部署了独立的akka系统。如果我们需要在这两台服务器的akka系统之间进行消息交换的话,所有消息都必须经过序列化/反序列化处理。akka系统对于用户自定义消息类型的默认序列化处理是以java-object serialization 方式进行的。我们上次提过:由于java-object-serialization会把一个java-object的类型信息、实例值、它所包含的其它类型描述信息等都写入序列化的结果里,所以会占据较大空间,传输数据的效率相对就低了。protobuf是binary格式的,基本只包括实例值,所以数据传输效率较高。下面我们就介绍如何在akka系统中使用protobuf序列化。在akka中使用自定义序列化方法包括下面的这些步骤:

    1、在.proto文件中对消息类型进行IDL定义

    2、用ScalaPB编译IDL文件并产生scala源代码。这些源代码中包括了涉及的消息类型及它们的操作方法

    3、在akka程序模块中import产生的classes,然后直接调用这些类型和方法

    4、按akka要求编写序列化方法

    5、在akka的.conf文件里actor.serializers段落中定义akka的默认serializer

    下面的build.sbt文件里描述了程序结构:

    lazy val commonSettings = Seq(
      name := "AkkaProtobufDemo",
      version := "1.0",
      scalaVersion := "2.12.6",
    )
    
    lazy val local = (project in file("."))
      .settings(commonSettings)
      .settings(
        libraryDependencies ++= Seq(
          "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
          "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
        ),
        name := "akka-protobuf-demo"
      )
    
    lazy val remote = (project in file("remote"))
      .settings(commonSettings)
      .settings(
        libraryDependencies ++= Seq(
          "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
        ),
        name := "remote-system"
      ).dependsOn(local)
    
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value
    )
    

    local和remote是两个分开的项目。我们会在这两个项目里分别部署akka系统。注意依赖项中的scalapb.runtime。PB.targets指明了产生源代码的路径。我们还需要在project/scalapb.sbt中指定scalaPB插件: 

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

    我们首先在.proto文件里定义消息:

    syntax = "proto3";
    
    // Brought in from scalapb-runtime
    import "scalapb/scalapb.proto";
    import "google/protobuf/wrappers.proto";
    
    package learn.proto;
    
    message Added {
    
        int32 nbr1 = 1;
        int32 nbr2 = 2;
    }
    
    message Subtracted {
        int32 nbr1 = 1;
        int32 nbr2 = 2;
    }
    
    message AddedResult {
        int32 nbr1 = 1;
        int32 nbr2 = 2;
        int32 result = 3;
    }
    
    message SubtractedResult {
        int32 nbr1 = 1;
        int32 nbr2 = 2;
        int32 result = 3;
    }

    现在我们先在remote项目里定义一个Calculator actor:

    package akka.protobuf.calculator
    import akka.actor._
    import com.typesafe.config.ConfigFactory
    import learn.proto.messages._
    
    class Calculator extends Actor with ActorLogging {
    
    
      override def receive: Receive = {
        case Added(a,b) =>
          log.info("Calculating %d + %d".format(a, b))
          sender() ! AddedResult(a,b,a+b)
        case Subtracted(a,b) =>
          log.info("Calculating %d - %d".format(a, b))
          sender() ! SubtractedResult(a,b,a-b)
      }
    
    }
    
    object Calculator {
      def props = Props(new Calculator)
    }
    
    object CalculatorStarter extends App {
    
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
        .withFallback(ConfigFactory.load())
    
      val calcSystem = ActorSystem("calcSystem",config)
    
      calcSystem.actorOf(Calculator.props,"calculator")
    
      println("press any key to end program ...")
    
      scala.io.StdIn.readLine()
    
      calcSystem.terminate()
    
    }

    运行CalculatorStarter产生一个calculator actor:  akka.tcp://calcSystem@127.0.0.1:2552/user/calculator

    下面我们在local项目里从端口2551上部署另一个akka系统,然后调用端口2552上部署akka系统的calculator actor:

    package akka.protobuf.calcservice
    import akka.actor._
    import learn.proto.messages._
    import scala.concurrent.duration._
    
    class CalcRunner(path: String) extends Actor with ActorLogging {
      sendIdentifyRequest()
    
      def sendIdentifyRequest(): Unit = {
        context.actorSelection(path) ! Identify(path)
        import context.dispatcher
        context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
      }
    
      def receive = identifying
    
      def identifying : Receive = {
        case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
          log.info("Remote calculator started!")
          context.watch(calcRef)
          context.become(calculating(calcRef))
        case ActorIdentity(_,None) =>
          log.info("Remote calculator not found!")
        case ReceiveTimeout =>
          sendIdentifyRequest()
        case s @ _ =>
          log.info(s"Remote calculator not ready. [$s]")
      }
    
      def calculating(calculator: ActorRef) : Receive = {
        case (op : Added) => calculator ! op
        case (op : Subtracted) => calculator ! op
    
        case AddedResult(a,b,r)  =>
          log.info(s"$a + $b = $r")
        case SubtractedResult(a,b,r) =>
          log.info(s"$a - $b = $r")
    
        case Terminated(calculator) =>
          log.info("Remote calculator terminated, restarting ...")
          sendIdentifyRequest()
          context.become(identifying)
    
        case ReceiveTimeout => //nothing
      }
    
    }
    
    object CalcRunner {
      def props(path: String) = Props(new CalcRunner(path))
    }

    这个CalcRunner是一个actor,在程序里首先通过向remote项目中的calculator-actor传送Identify消息以取得具体的ActorRef。然后用这个ActorRef与calculator-actor进行交互。这其中Identify是akka预定消息类型,其它消息都是ScalaPB从.proto文件中产生的。下面是local项目的运算程序:

    package akka.protobuf.demo
    import akka.actor._
    import akka.util.Timeout
    import com.typesafe.config.ConfigFactory
    import akka.protobuf.calcservice._
    
    import scala.concurrent.duration._
    import scala.util._
    import learn.proto.messages._
    
    object Main extends App {
    
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
        .withFallback(ConfigFactory.load())
    
      val calcSystem = ActorSystem("calcSystem",config)
    
      val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator"
    
      val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")
    
      println("Calculator started ...")
    
      import calcSystem.dispatcher
    
      calcSystem.scheduler.schedule(1.second, 1.second) {
        if (Random.nextInt(100) % 2 == 0)
          calculator ! Added(Random.nextInt(100), Random.nextInt(100))
        else
          calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
      }
    
    
      scala.io.StdIn.readLine()
    
    }

    配置文件application.conf:

    akka {
    
      actor {
        provider = remote
      }
    
      remote {
        netty.tcp {
          hostname = "127.0.0.1"
        }
      }
    
    }

    先运行remote然后local。注意下面出现的提示:

    [akka.serialization.Serialization(akka://calcSystem)] Using the default Java serializer for class [learn.proto.messages.Added] which is not recommended because of performance implications. Use another serializer 

    下面是protobuf类型的序列化方法:

    package akka.protobuf.serializer
    
    import akka.serialization.SerializerWithStringManifest
    import learn.proto.messages._
    
    
    class ProtobufSerializer extends SerializerWithStringManifest{
    
      def identifier: Int = 101110116
    
      override def manifest(o: AnyRef): String = o.getClass.getName
      final val AddedManifest = classOf[Added].getName
      final val SubtractedManifest = classOf[Subtracted].getName
      final val AddedResultManifest = classOf[AddedResult].getName
      final val SubtractedResultManifest = classOf[SubtractedResult].getName
    
    
      override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    
        println("inside fromBinary"+manifest)
    
        manifest match {
          case AddedManifest => Added.parseFrom(bytes)
          case SubtractedManifest => Subtracted.parseFrom(bytes)
          case AddedResultManifest => AddedResult.parseFrom(bytes)
          case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
        }
      }
    
      override def toBinary(o: AnyRef): Array[Byte] = {
    
        println("inside toBinary ")
        o match {
          case a: Added => a.toByteArray
          case s :Subtracted => s.toByteArray
          case aR: AddedResult => aR.toByteArray
          case sR: SubtractedResult => sR.toByteArray
        }
      }
    }

    然后我们需要在application.conf中告诉akka系统使用这些方法:

      actor {
    
        serializers {
    
          proto = "akka.protobuf.serializer.ProtobufSerializer"
        }
    
        serialization-bindings {
    
          "java.io.Serializable" = none
          "com.google.protobuf.Message" = proto
          "learn.proto.messages.Added" = proto
          "learn.proto.messages.AddedResult" = proto
          "learn.proto.messages.Subtracted" = proto
          "learn.proto.messages.SubtractedResult" = proto
    
        }
      }

    现在再重新运行:

    [INFO] [04/30/2018 18:41:02.348] [calcSystem-akka.actor.default-dispatcher-2] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] Remote calculator started!
    inside toBinary 
    inside fromBinarylearn.proto.messages.AddedResult
    [INFO] [04/30/2018 18:41:03.234] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 18 + 38 = 56
    inside toBinary 
    inside fromBinarylearn.proto.messages.AddedResult
    [INFO] [04/30/2018 18:41:04.197] [calcSystem-akka.actor.default-dispatcher-4] [akka.tcp://calcSystem@127.0.0.1:2551/user/calcRunner] 22 + 74 = 96

    系统使用了自定义的ProtobufferSerializer。

    下面是本次示范的完整源代码:

    project/scalapb.sbt

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

    build.sbt

    lazy val commonSettings = Seq(
      name := "AkkaProtobufDemo",
      version := "1.0",
      scalaVersion := "2.12.6",
    )
    
    lazy val local = (project in file("."))
      .settings(commonSettings)
      .settings(
        libraryDependencies ++= Seq(
          "com.typesafe.akka"      %% "akka-remote" % "2.5.11",
          "com.thesamet.scalapb" %% "scalapb-runtime" % scalapb.compiler.Version.scalapbVersion % "protobuf"
        ),
        name := "akka-protobuf-demo"
      )
    
    lazy val remote = (project in file("remote"))
      .settings(commonSettings)
      .settings(
        libraryDependencies ++= Seq(
          "com.typesafe.akka"      %% "akka-remote" % "2.5.11"
        ),
        name := "remote-system"
      ).dependsOn(local)
    
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value
    )
    

    resources/application.conf

    akka {
      actor {
        provider = remote
      }
      remote {
        netty.tcp {
          hostname = "127.0.0.1"
        }
      }
      actor {
        serializers {
          proto = "akka.protobuf.serializer.ProtobufSerializer"
        }
        serialization-bindings {
          "java.io.Serializable" = none
          "com.google.protobuf.Message" = proto
          "learn.proto.messages.Added" = proto
          "learn.proto.messages.AddedResult" = proto
          "learn.proto.messages.Subtracted" = proto
          "learn.proto.messages.SubtractedResult" = proto
    
        }
      }
    }

    main/protobuf/messages.proto

    syntax = "proto3";
    
    // Brought in from scalapb-runtime
    import "scalapb/scalapb.proto";
    import "google/protobuf/wrappers.proto";
    
    package learn.proto;
    
    message Added {
    
        int32 nbr1 = 1;
        int32 nbr2 = 2;
    }
    
    message Subtracted {
        int32 nbr1 = 1;
        int32 nbr2 = 2;
    }
    
    message AddedResult {
        int32 nbr1 = 1;
        int32 nbr2 = 2;
        int32 result = 3;
    }
    
    message SubtractedResult {
        int32 nbr1 = 1;
        int32 nbr2 = 2;
        int32 result = 3;
    }
    

    remote/Calculator.scala

    package akka.protobuf.calculator
    import akka.actor._
    import com.typesafe.config.ConfigFactory
    import learn.proto.messages._
    
    class Calculator extends Actor with ActorLogging {
    
    
      override def receive: Receive = {
        case Added(a,b) =>
          log.info("Calculating %d + %d".format(a, b))
          sender() ! AddedResult(a,b,a+b)
        case Subtracted(a,b) =>
          log.info("Calculating %d - %d".format(a, b))
          sender() ! SubtractedResult(a,b,a-b)
      }
    
    }
    
    object Calculator {
      def props = Props(new Calculator)
    }
    
    object CalculatorStarter extends App {
    
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2552")
        .withFallback(ConfigFactory.load())
    
      val calcSystem = ActorSystem("calcSystem",config)
    
      calcSystem.actorOf(Calculator.props,"calculator")
    
      println("press any key to end program ...")
    
      scala.io.StdIn.readLine()
    
      calcSystem.terminate()
    
    }

    CalcService.scala

    package akka.protobuf.calcservice
    import akka.actor._
    import learn.proto.messages._
    import scala.concurrent.duration._
    
    
    
    class CalcRunner(path: String) extends Actor with ActorLogging {
      sendIdentifyRequest()
    
      def sendIdentifyRequest(): Unit = {
        context.actorSelection(path) ! Identify(path)
        import context.dispatcher
        context.system.scheduler.scheduleOnce(3.seconds, self, ReceiveTimeout)
      }
    
      def receive = identifying
    
      def identifying : Receive = {
        case ActorIdentity(calcPath,Some(calcRef)) if (path.equals(calcPath)) =>
          log.info("Remote calculator started!")
          context.watch(calcRef)
          context.become(calculating(calcRef))
        case ActorIdentity(_,None) =>
          log.info("Remote calculator not found!")
        case ReceiveTimeout =>
          sendIdentifyRequest()
        case s @ _ =>
          log.info(s"Remote calculator not ready. [$s]")
      }
    
      def calculating(calculator: ActorRef) : Receive = {
        case (op : Added) => calculator ! op
        case (op : Subtracted) => calculator ! op
    
        case AddedResult(a,b,r)  =>
          log.info(s"$a + $b = $r")
        case SubtractedResult(a,b,r) =>
          log.info(s"$a - $b = $r")
    
        case Terminated(calculator) =>
          log.info("Remote calculator terminated, restarting ...")
          sendIdentifyRequest()
          context.become(identifying)
    
        case ReceiveTimeout => //nothing
      }
    
    }
    
    object CalcRunner {
      def props(path: String) = Props(new CalcRunner(path))
    }

    Main.scala

    package akka.protobuf.demo
    import akka.actor._
    import akka.util.Timeout
    import com.typesafe.config.ConfigFactory
    import akka.protobuf.calcservice._
    
    import scala.concurrent.duration._
    import scala.util._
    import learn.proto.messages._
    
    object Main extends App {
    
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=2551")
        .withFallback(ConfigFactory.load())
    
      val calcSystem = ActorSystem("calcSystem",config)
    
      val calcPath = "akka.tcp://calcSystem@127.0.0.1:2552/user/calculator"
    
      val calculator = calcSystem.actorOf(CalcRunner.props(calcPath),"calcRunner")
    
    
      println("Calculator started ...")
    
      import calcSystem.dispatcher
    
      calcSystem.scheduler.schedule(1.second, 1.second) {
        if (Random.nextInt(100) % 2 == 0)
          calculator ! Added(Random.nextInt(100), Random.nextInt(100))
        else
          calculator ! Subtracted(Random.nextInt(100), Random.nextInt(100))
      }
    
    
      scala.io.StdIn.readLine()
    
    }

    ProtobufferSerializer.scala

    package akka.protobuf.serializer
    
    import akka.serialization.SerializerWithStringManifest
    import learn.proto.messages._
    
    
    class ProtobufSerializer extends SerializerWithStringManifest{
    
      def identifier: Int = 101110116
    
      override def manifest(o: AnyRef): String = o.getClass.getName
      final val AddedManifest = classOf[Added].getName
      final val SubtractedManifest = classOf[Subtracted].getName
      final val AddedResultManifest = classOf[AddedResult].getName
      final val SubtractedResultManifest = classOf[SubtractedResult].getName
    
    
      override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
    
        println("inside fromBinary"+manifest)
    
        manifest match {
          case AddedManifest => Added.parseFrom(bytes)
          case SubtractedManifest => Subtracted.parseFrom(bytes)
          case AddedResultManifest => AddedResult.parseFrom(bytes)
          case SubtractedResultManifest => SubtractedResult.parseFrom(bytes)
        }
      }
    
      override def toBinary(o: AnyRef): Array[Byte] = {
    
        println("inside toBinary ")
        o match {
          case a: Added => a.toByteArray
          case s :Subtracted => s.toByteArray
          case aR: AddedResult => aR.toByteArray
          case sR: SubtractedResult => sR.toByteArray
        }
      }
    }

     

  • 相关阅读:
    使用 typeScript 规范代码
    图片 剪切 上传
    hybrid
    resful
    区块链
    前端数据采集 埋点 追踪用户系列行为
    kafka生产消息的速度跟什么有关?
    引用:实际数据库需求变化及演变
    HBase学习
    使用scala开发spark入门总结
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8974317.html
Copyright © 2011-2022 走看看