zoukankan      html  css  js  c++  java
  • ScalaPB(3): gRPC streaming

      接着上期讨论的gRPC unary服务我们跟着介绍gRPC streaming,包括: Server-Streaming, Client-Streaming及Bidirectional-Streaming。我们首先在.proto文件里用IDL描述Server-Streaming服务:

    /*
     *  responding stream of increment results
     */
    service SumOneToMany {
      rpc AddOneToMany(SumRequest) returns (stream SumResponse) {}
    }
    
    message SumRequest {
      int32 toAdd = 1;
    }
    
    message SumResponse {
      int32 currentResult = 1;
    }

    SumOneToMany服务中AddOneToMany函数接受一个SumRequest然后返回stream SumResponse,就这么简单。经过编译后产生了SumOneToManyGrpc.scala文件,在这个文件里提供了有关RPC操作的api。我们看看protoc把IDL描述的服务函数变成了什么样的scala函数:

    def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit 

    调用scala函数addOneToMany需要传入参数SumRequest和StreamObserver[SumResponse],也就是说用户需要准备这两个入参数。在调用addOneToMany函数时用户事先构建这个StreamObserver传给server,由server把结果通过这个结构传回用户。gRPC是通过StreamObserver类型实例来实现数据streaming的。这个类型的构建例子如下:

        val responseObserver = new StreamObserver[SumResponse] {
          def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
          def onCompleted(): Unit         = println("ON_COMPLETED")
          def onNext(value: SumResponse): Unit =
            println(s"ON_NEXT: Current sum: ${value.currentResult}")
        }

    server端通过onNext把结果不断传回给client端,因为这个responseObserver是在client端构建的。下面是SumManyToMany的实现:

     class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany {
        override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = {
          val currentSum: AtomicInt = Atomic(0)
          (1 to request.toAdd).map { _ =>
              responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
          }
          Thread.sleep(1000)     //delay and then finish
          responseObserver.onCompleted()
        }
      }

    这个addOneToMany服务函数把 1-request.toAdd之间的数字逐个通过responseObserver返还调用方。 在客户端如下调用服务:

        // get asyn stub
        val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel)
    // prepare stream observer
        val streamObserver = new StreamObserver[SumResponse] {
          override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
          override def onCompleted(): Unit = println("Done incrementing !!!")
          override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
        }
    // call service with stream observer
        client.addOneToMany(SumRequest().withToAdd(6),streamObserver)

    Client-Streaming服务的IDL如下:

    /*
     *  responding a result from a request of stream of numbers
     */
    service SumManyToOne {
      rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
    }

    传入stream SumRequest, 返回SumResponse。scalaPB自动产生scala代码中的addManyToOne函数款式如下:

    def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]

    调用方提供StreamObserver[SumResponse]用作返回结果,函数返回客方需要的StreamObserver[SumRequest]用以传递request流。注意:虽然在.proto文件中AddManyToOne的返回结果是单个SumResponse,但产生的scala函数则提供了一个StreamObserver[SumResponse]类型,所以需要谨记只能调用一次onNext。下面是这个服务的实现代码:

      class Many2OneService extends SumManyToOneGrpc.SumManyToOne {
         val currentSum: AtomicInt = Atomic(0)
         override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
           new StreamObserver[SumRequest] {
             val currentSum: AtomicInt = Atomic(0)
             override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
             override def onCompleted(): Unit = println("Done summing!")
             override def onNext(value: SumRequest): Unit = {
               //only allow one response
               if (value.toAdd > 0)
                  currentSum.add(value.toAdd)
               else
                  responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
             }
           }
       }

    客户方调用示范如下:

        //pass to server for result
        val respStreamObserver = new StreamObserver[SumResponse] {
          override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
          override def onCompleted(): Unit = println("Done responding!")
          override def onNext(value: SumResponse): Unit =
            println(s"Result: ${value.currentResult}")
        }
        //get async stub
        val client = SumManyToOneGrpc.stub(channel)
    
        //get request stream observer from server
        val reqStreamObserver = client.addManyToOne(respStreamObserver)
    
        List(2,5,8,4,0).map { n =>
          reqStreamObserver.onNext(SumRequest(n))
        }

    Bidirectional-Streaming的IDL描述如下:

    /*
     * Sums up numbers received from the client and returns the current result after each received request.
     */
    service SumInter {
      rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
    }

    这个service SumInter 描述了stream SumRequest 及 stream SumResponse运算模式。产生的对应scala函数如下:

    def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest]
    

    这个函数的款式与Client-Streaming服务函数是一样的。但是,我们可以通过responseObserver传递多个SumResponse。这个服务的实现代码是这样的: 

      class Many2ManyService extends SumInterGrpc.SumInter {
        override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
          new StreamObserver[SumRequest] {
            val currentSum: AtomicInt = Atomic(0)
            override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
            override def onCompleted(): Unit = println("Done requesting!")
            override def onNext(value: SumRequest): Unit = {
              responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
            }
          }
      }

    我们可以多次调用responseObserver.onNext。客户端源代码如下:

        //create stream observer for result stream
        val responseObserver = new StreamObserver[SumResponse] {
          def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
          def onCompleted(): Unit         = println("ON_COMPLETED")
          def onNext(value: SumResponse): Unit =
            println(s"ON_NEXT: Current sum: ${value.currentResult}")
        }
        //get request container
        val requestObserver = client.addInter(responseObserver)
    
        scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) {
          val toBeAdded = Random.nextInt(11)
          println(s"Adding number: $toBeAdded")
          requestObserver.onNext(SumRequest(toBeAdded))
        }

    下面是本次示范的源代码:

    project/scalapb.sbt

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

    build.sbt

    import scalapb.compiler.Version.scalapbVersion
    import scalapb.compiler.Version.grpcJavaVersion
    
    name := "learn-gRPC"
    
    version := "0.1"
    
    scalaVersion := "2.12.6"
    
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
      "io.grpc" % "grpc-netty" % grpcJavaVersion,
      "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
      "io.monix" %% "monix" % "2.3.0"
    )
    
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value
    )

    src/main/protobuf/sum.proto

    syntax = "proto3";
    
    package learn.grpc.services;
    
    /*
     *  responding stream of increment results
     */
    service SumOneToMany {
      rpc AddOneToMany(SumRequest) returns (stream SumResponse) {}
    }
    
    /*
     *  responding a result from a request of stream of numbers
     */
    service SumManyToOne {
      rpc AddManyToOne(stream SumRequest ) returns (SumResponse) {}
    }
    
    /*
     * Sums up numbers received from the client and returns the current result after each received request.
     */
    service SumInter {
      rpc AddInter(stream SumRequest) returns (stream SumResponse) {}
    }
    
    message SumRequest {
      int32 toAdd = 1;
    }
    
    message SumResponse {
      int32 currentResult = 1;
    }

    gRPCServer.scala

    package learn.grpc.server
    import io.grpc.{ServerBuilder,ServerServiceDefinition}
    
    trait gRPCServer {
      def runServer(service: ServerServiceDefinition): Unit = {
        val server = ServerBuilder
          .forPort(50051)
          .addService(service)
          .build
          .start
    
        // make sure our server is stopped when jvm is shut down
        Runtime.getRuntime.addShutdownHook(new Thread() {
          override def run(): Unit = server.shutdown()
        })
    
        server.awaitTermination()
      }
    
    }

    OneToManyServer.scala

    package learn.grpc.sum.one2many.server
    import io.grpc.stub.StreamObserver
    import learn.grpc.services.sum._
    import monix.execution.atomic.{Atomic,AtomicInt}
    import learn.grpc.server.gRPCServer
    
    object One2ManyServer extends gRPCServer {
    
      class SumOne2ManyService extends SumOneToManyGrpc.SumOneToMany {
        override def addOneToMany(request: SumRequest, responseObserver: StreamObserver[SumResponse]): Unit = {
          val currentSum: AtomicInt = Atomic(0)
          (1 to request.toAdd).map { _ =>
              responseObserver.onNext(SumResponse().withCurrentResult(currentSum.incrementAndGet()))
          }
          Thread.sleep(1000)     //delay and then finish
          responseObserver.onCompleted()
        }
      }
    
      def main(args: Array[String]) = {
        val svc = SumOneToManyGrpc.bindService(new SumOne2ManyService, scala.concurrent.ExecutionContext.global)
        runServer(svc)
      }
    
    }

    OneToManyClient.scala

    package learn.grpc.sum.one2many.client
    import io.grpc.stub.StreamObserver
    import learn.grpc.services.sum._
    
    object One2ManyClient {
      def main(args: Array[String]): Unit = {
    
        //build connection channel
        val channel = io.grpc.ManagedChannelBuilder
          .forAddress("LocalHost",50051)
          .usePlaintext(true)
          .build()
    
        // get asyn stub
        val client: SumOneToManyGrpc.SumOneToManyStub = SumOneToManyGrpc.stub(channel)
    // prepare stream observer
        val streamObserver = new StreamObserver[SumResponse] {
          override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
          override def onCompleted(): Unit = println("Done incrementing !!!")
          override def onNext(value: SumResponse): Unit = println(s"current value: ${value.currentResult}")
        }
    // call service with stream observer
        client.addOneToMany(SumRequest().withToAdd(6),streamObserver)
    
        // wait for async execution
        scala.io.StdIn.readLine()
      }
    }

    ManyToOneServer.scala

    package learn.grpc.sum.many2one.server
    import io.grpc.stub.StreamObserver
    import learn.grpc.services.sum._
    import learn.grpc.server.gRPCServer
    import monix.execution.atomic.{Atomic,AtomicInt}
    
    object Many2OneServer extends gRPCServer {
       class Many2OneService extends SumManyToOneGrpc.SumManyToOne {
         val currentSum: AtomicInt = Atomic(0)
         override def addManyToOne(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
           new StreamObserver[SumRequest] {
             val currentSum: AtomicInt = Atomic(0)
             override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
             override def onCompleted(): Unit = println("Done summing!")
             override def onNext(value: SumRequest): Unit = {
               //only allow one response
               if (value.toAdd > 0)
                  currentSum.add(value.toAdd)
               else
                  responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
             }
           }
       }
    
       def main(args: Array[String]): Unit = {
         val svc = SumManyToOneGrpc.bindService(new Many2OneService,scala.concurrent.ExecutionContext.global)
         runServer(svc)
       }
    }

    ManyToOneClient.scala

    package learn.grpc.sum.many2one.client
    import io.grpc.stub.StreamObserver
    import learn.grpc.services.sum._
    
    object Many2OneClient {
      def main(args: Array[String]): Unit = {
        //build channel
        val channel = io.grpc.ManagedChannelBuilder
          .forAddress("LocalHost",50051)
          .usePlaintext(true)
          .build()
        //pass to server for result
        val respStreamObserver = new StreamObserver[SumResponse] {
          override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
          override def onCompleted(): Unit = println("Done responding!")
          override def onNext(value: SumResponse): Unit =
            println(s"Result: ${value.currentResult}")
        }
        //get async stub
        val client = SumManyToOneGrpc.stub(channel)
    
        //get request stream observer from server
        val reqStreamObserver = client.addManyToOne(respStreamObserver)
    
        List(2,5,8,4,0).map { n =>
          reqStreamObserver.onNext(SumRequest(n))
        }
        scala.io.StdIn.readLine()
      }
    }

    ManyToManyServer.scala 

    package learn.grpc.sum.many2many.server
    import io.grpc.stub.StreamObserver
    import learn.grpc.services.sum._
    import learn.grpc.server.gRPCServer
    import monix.execution.atomic.{Atomic,AtomicInt}
    object Many2ManyServer extends gRPCServer {
      class Many2ManyService extends SumInterGrpc.SumInter {
        override def addInter(responseObserver: StreamObserver[SumResponse]): StreamObserver[SumRequest] =
          new StreamObserver[SumRequest] {
            val currentSum: AtomicInt = Atomic(0)
    
            override def onError(t: Throwable): Unit = println(s"error: ${t.getMessage}")
    
            override def onCompleted(): Unit = println("Done requesting!")
    
            override def onNext(value: SumRequest): Unit = {
              responseObserver.onNext(SumResponse(currentSum.addAndGet(value.toAdd)))
            }
          }
      }
      def main(args: Array[String]): Unit = {
        val svc = SumInterGrpc.bindService(new Many2ManyService, scala.concurrent.ExecutionContext.global)
        runServer(svc)
      }
    
    }

    ManyToManyClient.scala

    package learn.grpc.sum.many2many.client
    import monix.execution.Scheduler.{global => scheduler}
    import learn.grpc.services.sum._
    
    import scala.concurrent.duration._
    import scala.util.Random
    import io.grpc._
    import io.grpc.stub.StreamObserver
    
    object Many2ManyClient {
      def main(args: Array[String]): Unit = {
        val channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext(true).build
        val client  = SumInterGrpc.stub(channel)
        //create stream observer for result stream
        val responseObserver = new StreamObserver[SumResponse] {
          def onError(t: Throwable): Unit = println(s"ON_ERROR: $t")
          def onCompleted(): Unit         = println("ON_COMPLETED")
          def onNext(value: SumResponse): Unit =
            println(s"ON_NEXT: Current sum: ${value.currentResult}")
        }
        //get request container
        val requestObserver = client.addInter(responseObserver)
    
        scheduler.scheduleWithFixedDelay(0.seconds, 1.seconds) {
          val toBeAdded = Random.nextInt(11)
          println(s"Adding number: $toBeAdded")
          requestObserver.onNext(SumRequest(toBeAdded))
        }
    
        scala.io.StdIn.readLine()
      }
    
    }
  • 相关阅读:
    万兴往事-当时年少
    万兴往事-目录
    万兴往事-题记
    CR TubeGet 0.9.2.7,YouTube&全网视频终极下载
    c#: WebBrowser控件注入js代码的三种方案
    c#: 剪切板监视实现
    麦收季节
    从深圳到南阳-13天单骑3000里回家记
    浅谈Python设计模式 -- 责任链模式
    浅谈Python设计模式
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/9023937.html
Copyright © 2011-2022 走看看