zoukankan      html  css  js  c++  java
  • ScalaPB(5):用akka-stream实现reactive-gRPC

      在前面几篇讨论里我们介绍了scala-gRPC的基本功能和使用方法,我们基本确定了选择gRPC作为一种有效的内部系统集成工具,主要因为下面gRPC支持的几种服务模式:

    1、Unary-Call:独立的一对client-request/server-response,是我们常用的http交互模式
    
    2、Server-Streaming:client发出一个request后从server端接收一串多个response
    
    3、Client-Streaming:client向server发送一串多个request后从server接收一个response
    
    4、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。

    很明显,gRPC支持双向的streaming。那么如果能把gRPC中ListenableFuture和StreamObserver这两种类型转成akka-stream的基本类型应该就能够实现所谓的reactive-gRPC了。如果我们能用akka-stream编程方式实现gRPC服务调用的话,可能会遭遇下面的场景:在服务端我们只需要实现一种akka-stream的Flow把进来的request转化成出去的response,如下:

    // Unary case
    Flow[Request].map(computeResponse)
     
    // Server streaming
    Flow[Request].flatMapConcat(computeResponses)
     
    // Client streaming
    Flow[Request].fold(defaultResponse)(computeResponse)
     
    // Bidirectional streaming
    Flow[Request].flatMapConcat(computeResponses)

    当然,这是个akka-stream Flow,我们可以在这个Flow里调用任何akka-stream提供的功能,如:

    Flow[Request]
      .throttle(1, 10.millis, 1, ThrottleMode.Shaping)
      .map(computeResponse)

    在客户端我们可以直接经客户端stub调用Flow,如下:

    Source
      .single(request)
      .via(stub.doSomething)
      .runForeach(println)

    刚好,beyond-the-lines gRPCAkkaStream开源项目提供这么一种gRPC StreamObserver到aka-stream Flow转换桥梁。下面是gRPCAkkaStream的使用示范。先从Unary-Call开始:下面是.proto文件的IDL服务描述:

    syntax = "proto3";
    package learn.grpc.akka.stream.services;
    message NumPair {
       int32 num1 = 1;
       int32 num2 = 2;
    }
    message Num {
       int32 num = 1;
    }
    message SumResult {
       int32 result = 1;
    }
    service SumNumbers {
       rpc SumPair(NumPair) returns (SumResult) {}
    }

    我们看看编译后自动产生的SumGrpcAkkaStream.scala文件中一些相关类型和函数:

    服务界面描述:

    trait SumNumbers extends AbstractService {
        override def serviceCompanion = SumNumbers
        def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
    }

    我们看到服务函数sumPair是一个akka-stream Fow[NumPair,SumResult,NotUsed]。下面是具体实现SumNumbers.sumPair代码:

    class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
      val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
        override def sumPair: Flow[NumPair, SumResult, NotUsed] = {
          logger.info(s"*** calling sumPair ...  ***")
          Flow[NumPair].map {
            case NumPair(a,b) => {
              logger.info(s"serving ${a} + ${b} = ???")
              SumResult(a + b)
            }
          }
        }

    产生的客户端stub源代码如下:

     class SumNumbersStub(
        channel: Channel,
        options: CallOptions = CallOptions.DEFAULT
      ) extends AbstractStub[SumNumbersStub](channel, options) with SumNumbers {
        override def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed] =
          Flow[learn.grpc.akka.stream.services.sum.NumPair].flatMapConcat(request =>
            Source.fromFuture(
              Grpc.guavaFuture2ScalaFuture(
                ClientCalls.futureUnaryCall(channel.newCall(METHOD_SUM_PAIR, options), request)
              )
            )
          )
      
     def stub(channel: Channel): SumNumbersStub = new SumNumbersStub(channel)

    我们可以通过stub来调用sumPair方法,如下:

      val channel = ManagedChannelBuilder
          .forAddress(host,port)
          .usePlaintext(true)
          .build()
    
      val stub = SumGrpcAkkaStream.stub(channel)
    
      def addPair(num1: Int, num2: Int): Source[String,NotUsed] = {
        logger.info(s"Requesting to add $num1, $num2")
        Source
          .single(NumPair(num1,num2))
          .via(stub.sumPair)
          .map(r => s"the result: ${r.result}")
      }

    下面是Unary-Call的具体调用方式:

    object UnaryCallClient extends App {
      implicit val system = ActorSystem("UnaryClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.addPair(29,33).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }

    在Server-Streaming中一个request返回的是stream of responses。IDL的描述如下:

    service SumNumbers {
       rpc SumPair(NumPair) returns (SumResult) {}
       rpc GenIncsFrom(Num) returns (stream Num) {}
    }

    编译后自动产生的service trait如下:

     trait SumNumbers extends AbstractService {
        override def serviceCompanion = SumNumbers
        def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
        def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed]
      }

    这个服务函数genIncsFrom是Flow[Num,Num,NotUsed],它的具体实现如下:

    class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
      val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
      override def genIncsFrom: Flow[Num, Num, NotUsed] = {
        logger.info("*** calling genIncsFrom")
        Flow[Num].mapConcat {
          n => (1 to n.num).map {m =>
            logger.info(s"genIncFrom producing num: ${m}")
            Num(m)
          }
        }
      }
    }

    因为输出response是一个stream,可以用mapConcat展平Seq来产生一个。在客户方调用服务函数genIncsFrom的方式如下:

      def genIncNumbers(len: Int): Source[Int,NotUsed] = {
        logger.info(s"Requesting to produce ${len} inc numbers")
        Source
          .single(Num(len))
          .via(stub.genIncsFrom)
          .map(n => n.num)
      }

    我们还是用runForeach来运算这个Source:

    object ServerStreamingClient extends App {
      implicit val system = ActorSystem("ServerStreamingClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.genIncNumbers(5).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }

    再来看看Client-Streaming是如何通过reactive-stream实现的。IDL服务描述如下:

    service SumNumbers {
       rpc SumPair(NumPair) returns (SumResult) {}
       rpc GenIncsFrom(Num) returns (stream Num) {}
       rpc SumStreamNums(stream Num) returns (SumResult) {}
    }

    自动产生的service接口如下:

      trait SumNumbers extends AbstractService {
        override def serviceCompanion = SumNumbers
        def sumPair: Flow[learn.grpc.akka.stream.services.sum.NumPair, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
        def genIncsFrom: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.Num, NotUsed]
        def sumStreamNums: Flow[learn.grpc.akka.stream.services.sum.Num, learn.grpc.akka.stream.services.sum.SumResult, NotUsed]
      }

    sumStreamNums Flow实现如下:

      override def sumStreamNums: Flow[Num, SumResult, NotUsed] = {
        logger.info("*** calling sumStreamNums")
        Flow[Num].fold(SumResult(0)) {
          case (a, b) =>
            logger.info(s"receiving operand ${b.num}")
            SumResult(b.num + a.result)
        }
      }

    request是一个stream,可以用aggregation来汇总成一个response。在客户端调用stub.sumStreamNums:

      def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = {
        logger.info(s"Requesting to sum up ${nums}")
        Source(nums.map(Num(_)).to[collection.immutable.Iterable])
          .via(stub.sumStreamNums)
          .map(r => s"the result: ${r.result}")
        }
    
    object ClientStreamingClient extends App {
      implicit val system = ActorSystem("ClientStreamingClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }

    最后我们示范一下BiDirectional-Streaming。先用IDL定义一个流输入输出的服务函数keepAdding:

    service SumNumbers {
       rpc SumPair(NumPair) returns (SumResult) {}
       rpc GenIncsFrom(Num) returns (stream Num) {}
       rpc SumStreamNums(stream Num) returns (SumResult) {}
       rpc KeepAdding(stream Num) returns (stream SumResult) {}
    }

    这个函数的实现代码: 

      override def keepAdding: Flow[Num, SumResult, NotUsed] = {
        Flow[Num].scan(SumResult(0)) {
          case (a,b) =>
            logger.info(s"receiving operand ${b.num}")
            SumResult(b.num + a.result)
        }
      }

    这个服务函数的作用是把一串输入数字逐个相加并输出当前结果。我们可以用scan来实现这样的功能。下面是客户端调用服务的示范代码:

      def ContSum(nums: Seq[Int]): Source[String,NotUsed] = {
        logger.info(s"Requesting to sum up ${nums}")
        Source(nums.map(Num(_)).to[collection.immutable.Iterable])
          .throttle(1, 500.millis, 1, ThrottleMode.shaping)
          .map { n =>
            logger.info(s"Sending number: $n")
            n
          }
          .via(stub.keepAdding)
          .map(r => s"current sum = ${r.result}")
      }

    用下面这段代码运算:

    object BiDiStreamingClient extends App {
      implicit val system = ActorSystem("BiDiStreamingClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.ContSum(Seq(12,4,8,19)).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }

    好,下面是本次讨论涉及的所有源代码:

    project/scalapb.sbt

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    
    resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
    
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1",
      "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.5"
    )

    build.sbt

    import scalapb.compiler.Version.scalapbVersion
    import scalapb.compiler.Version.grpcJavaVersion
    
    name := "gRPCAkkaStreamDemo"
    
    version := "0.1"
    
    scalaVersion := "2.12.6"
    
    resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
    
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
      "io.grpc" % "grpc-netty" % grpcJavaVersion,
      "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
      "io.monix" %% "monix" % "2.3.0",
      // for GRPC Akkastream
      "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.5"
    )
    
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value,
      // generate the akka stream files
      grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
    )

    src/main/protobuf/sum.proto

    syntax = "proto3";
    
    package learn.grpc.akka.stream.services;
    
    
    message NumPair {
       int32 num1 = 1;
       int32 num2 = 2;
    }
    
    message Num {
       int32 num = 1;
    }
    
    message SumResult {
       int32 result = 1;
    }
    
    service SumNumbers {
       rpc SumPair(NumPair) returns (SumResult) {}
       rpc GenIncsFrom(Num) returns (stream Num) {}
       rpc SumStreamNums(stream Num) returns (SumResult) {}
       rpc KeepAdding(stream Num) returns (stream SumResult) {}
    }

    src/main/scala/gRPCAkkaStreamService.scala

    package learn.grpc.akka.stream.services.impl
    
    import akka.NotUsed
    import akka.stream.scaladsl.Flow
    import learn.grpc.akka.stream.services.sum._
    import java.util.logging.Logger
    
    class gRPCAkkaStreamService extends SumGrpcAkkaStream.SumNumbers {
      val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamService].getName)
    
      override def sumPair: Flow[NumPair, SumResult, NotUsed] = {
        logger.info(s"*** calling sumPair ...  ***")
        Flow[NumPair].map {
          case NumPair(a, b) => {
            logger.info(s"serving ${a} + ${b} = ???")
            SumResult(a + b)
          }
        }
      }
    
      override def genIncsFrom: Flow[Num, Num, NotUsed] = {
        logger.info("*** calling genIncsFrom ... ***")
        Flow[Num].mapConcat {
          n =>
            (1 to n.num).map { m =>
              logger.info(s"genIncFrom producing num: ${m}")
              Num(m)
            }
        }
      }
    
      override def sumStreamNums: Flow[Num, SumResult, NotUsed] = {
        logger.info("*** calling sumStreamNums ... ***")
        Flow[Num].fold(SumResult(0)) {
          case (a, b) =>
            logger.info(s"receiving operand ${b.num}")
            SumResult(b.num + a.result)
        }
      }
    
      override def keepAdding: Flow[Num, SumResult, NotUsed] = {
        Flow[Num].scan(SumResult(0)) {
          case (a,b) =>
            logger.info(s"receiving operand ${b.num}")
            SumResult(b.num + a.result)
        }
      }
    }

    src/main/scala/gRPCAkkaStreamServer.scala

    package learn.grpc.akka.stream.server
    
    import java.util.logging.Logger
    
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import io.grpc.Server
    import learn.grpc.akka.stream.services.impl.gRPCAkkaStreamService
    import io.grpc.ServerBuilder
    import learn.grpc.akka.stream.services.sum._
    class gRPCServer(server: Server) {
    
      val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
    
      def start(): Unit = {
        server.start()
        logger.info(s"Server started, listening on ${server.getPort}")
        sys.addShutdownHook {
          // Use stderr here since the logger may has been reset by its JVM shutdown hook.
          System.err.println("*** shutting down gRPC server since JVM is shutting down")
          stop()
          System.err.println("*** server shut down")
        }
        ()
      }
    
      def stop(): Unit = {
        server.shutdown()
      }
    
      /**
        * Await termination on the main thread since the grpc library uses daemon threads.
        */
      def blockUntilShutdown(): Unit = {
        server.awaitTermination()
      }
    }
    
    object DemoServer extends App {
      implicit val system = ActorSystem("UnaryServer")
      implicit val mat = ActorMaterializer.create(system)
      val server = new gRPCServer(
        ServerBuilder
          .forPort(50051)
            .addService(
              SumGrpcAkkaStream.bindService(
               new gRPCAkkaStreamService
              )
          ).build()
      )
      server.start()
    //  UnaryServer.blockUntilShutdown()
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }

    src/main/scala/gRPCAkkaStreamClient.scala

    package learn.grpc.akka.stream.client
    import learn.grpc.akka.stream.services.sum._
    import java.util.logging.Logger
    
    import akka.stream.scaladsl._
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, ThrottleMode}
    import scala.concurrent.duration._
    import io.grpc._
    class gRPCAkkaStreamClient(host: String, port: Int) {
      val logger: Logger = Logger.getLogger(classOf[gRPCAkkaStreamClient].getName)
    
      val channel = ManagedChannelBuilder
          .forAddress(host,port)
          .usePlaintext(true)
          .build()
    
      val stub = SumGrpcAkkaStream.stub(channel)
    
      def addPair(num1: Int, num2: Int): Source[String,NotUsed] = {
        logger.info(s"Requesting to add $num1, $num2")
        Source
          .single(NumPair(num1,num2))
          .via(stub.sumPair)
          .map(r => s"the result: ${r.result}")
      }
      def genIncNumbers(len: Int): Source[Int,NotUsed] = {
        logger.info(s"Requesting to produce ${len} inc numbers")
        Source
          .single(Num(len))
          .via(stub.genIncsFrom)
          .map(n => n.num)
      }
      def sumManyNumbers(nums: Seq[Int]): Source[String,NotUsed] = {
        logger.info(s"Requesting to sum up ${nums}")
        Source(nums.map(Num(_)).to[collection.immutable.Iterable])
          .throttle(1, 500.millis, 1, ThrottleMode.shaping)
          .map { n =>
            logger.info(s"Sending number: $n")
            n
          }
          .via(stub.sumStreamNums)
          .map(r => s"the result: ${r.result}")
        }
      def ContSum(nums: Seq[Int]): Source[String,NotUsed] = {
        logger.info(s"Requesting to sum up ${nums}")
        Source(nums.map(Num(_)).to[collection.immutable.Iterable])
          .throttle(1, 500.millis, 1, ThrottleMode.shaping)
          .map { n =>
            logger.info(s"Sending number: $n")
            n
          }
          .via(stub.keepAdding)
          .map(r => s"current sum = ${r.result}")
      }
    }
    
    object UnaryCallClient extends App {
      implicit val system = ActorSystem("UnaryClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.addPair(29,33).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
    
    object ServerStreamingClient extends App {
      implicit val system = ActorSystem("ServerStreamingClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.genIncNumbers(5).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
    
    object ClientStreamingClient extends App {
      implicit val system = ActorSystem("ClientStreamingClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.sumManyNumbers(Seq(12,4,8,19)).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
    
    object BiDiStreamingClient extends App {
      implicit val system = ActorSystem("BiDiStreamingClient")
      implicit val mat = ActorMaterializer.create(system)
    
      val client = new gRPCAkkaStreamClient("localhost", 50051)
    
      client.ContSum(Seq(12,4,8,19)).runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
  • 相关阅读:
    apache的日志切割
    实现HTTPS--Apache+Openssl
    CentOS 6.x 编译安装LAMP
    apache的域名跳转
    模型生成过程中检测到一个或多个验证错误
    电商时代已经要过去了。接下来是零售
    电商时代已经要过去了。接下来是零售
    华为手机怎么安装Google
    华为手机怎么安装Google
    table不让td中文字溢出操作方法
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/9066799.html
Copyright © 2011-2022 走看看