zoukankan      html  css  js  c++  java
  • ScalaPB(2): 在scala中用gRPC实现微服务

       gRPC是google开源提供的一个RPC软件框架,它的特点是极大简化了传统RPC的开发流程和代码量,使用户可以免除许多陷阱并聚焦于实际应用逻辑中。作为一种google的最新RPC解决方案,gRPC具备了以下这些强项:

    1、gRPC在HTTP/2协议上用protobuf取代了json实现了最佳效率

    2、用IDL(Interface Definition Language),一种简单的描述语言来自动产生RPC的api源代码

    3、支持blocking/non-blocking双向数据流交互,适合程序的流程控制

    gRPC的使用非常简单,具体流程如下:

    1、在一个.proto字符类文件中用IDL来描述用户自定义的数据类型和服务

    2、用protoc编译器编译文件并产生自定义数据类型和服务的api源代码

    3、在server端实现.proto中定义的服务函数

    4、在client端通过自动产生的stub来调用服务

    下面我们就来示范gRPC的编程流程。gRPC支持下面这几种服务类型:

    1、Unary:独立的一对client-request/server-response,是我们常用的http交互模式

    2、Server-Streaming:client发出一个request后从server端接收一串多个response

    3、Client-Streaming:client向server发送一串多个request后从server接收一个response

    4、Bidirectional-Streaming:由client首先发送request启动连接,然后在这个连接上两端可以不断交互信息。

    在本篇讨论中我们先示范Unary-service的编程流程,下面是.proto文件内容:

    syntax = "proto3";
    
    import "google/protobuf/wrappers.proto";
    import "scalapb/scalapb.proto";
    
    package learn.grpc.services;
    
    /*
     * Returns a greeting for the given person optionally including a custom message.
     */
    service HelloWorld {
      rpc SayHello(ToBeGreeted) returns (Greeting) {}
    }
    
    message Person {
       string name = 1;
    }
    
    message ToBeGreeted {
      Person person = 1;
      google.protobuf.StringValue msg = 2;
    }
    
    message Greeting {
      string message = 1;
    }

    这段IDL描述了一个HelloWorld服务,包括了一个服务函数SayHello。三种数据类型:Person,ToBeGreeted,Greeting。通过对.proto文件进行编译后产生文件中包括一个HelloWorldGrpc.scala文件,里面提供了一些重要的api:

    trait HelloWorld -> 用于实现HelloWorld服务的trait
    trait HelloWorldBlockingClient -> 用于实现客户端stub
    class HelloWorldBlockingStub -> blocking客户端stub
    class HelloWorldStub -> non-blocking客户端stub
    def bindService -> 服务类型绑带方法

    我们先实现HelloWorld服务:

     class HelloService extends HelloWorldGrpc.HelloWorld {
        override def sayHello(request: ToBeGreeted): Future[Greeting] = {
          val greeter = request.person match {
            case Some(p) => p.name
            case None => "friendo"
          }
          Future.successful(Greeting(message = s"Hello $greeter, ${request.msg}"))
        }
      }

    可以看到我们直接使用了IDL描述的自定义数据类型如:ToBeGreeted,Greeting。在客户端调用服务并输出返回结果response:

        //build connection channel
        val channel = io.grpc.ManagedChannelBuilder
          .forAddress("LocalHost",50051)
            .usePlaintext(true)
          .build()
    
        //construct requestHelloService
        val greeter = ToBeGreeted()
          .withMsg("remote greetings!")
          .withPerson(ToBeGreeted.Person("mickey"))
    
        //async call
        val asyncStub: HelloWorldGrpc.HelloWorldStub = HelloWorldGrpc.stub(channel)
        val futResponse: Future[Greeting] = asyncStub.sayHello(greeter)
    
        import scala.concurrent.ExecutionContext.Implicits.global
        futResponse.foreach(greeting => println(greeting.message))
    
        val greeter2 = ToBeGreeted(person = Some(Person("jacky")),msg = Some("how are you?"))
        //sync call
        val syncStub: HelloWorldGrpc.HelloWorldBlockingClient = HelloWorldGrpc.blockingStub(channel)
        val response: Greeting = syncStub.sayHello(greeter2)
    
        println(s"${response.message}")

    下面是bindService方法的使用示范:

      def main(args: Array[String]): Unit = {
        val service = HelloWorldGrpc.bindService(new HelloService,ExecutionContext.global)
        runServer(service)
      }

    runServer函数定义如下:

    package learn.grpc.server
    import io.grpc.{ServerBuilder,ServerServiceDefinition}
    
    trait gRPCServer {
      def runServer(service: ServerServiceDefinition): Unit = {
        val server = ServerBuilder
          .forPort(50051)
          .addService(service)
          .build
          .start
    
        // make sure our server is stopped when jvm is shut down
        Runtime.getRuntime.addShutdownHook(new Thread() {
          override def run(): Unit = server.shutdown()
        })
    
        server.awaitTermination()
      }
    
    }

    注意我们还使用了io.grpc库里的类型和方法,这是protobuf项目提供的一个标准库。在客户端也需要使用它来构建通道:

       //build connection channel
        val channel = io.grpc.ManagedChannelBuilder
          .forAddress("LocalHost",50051)
            .usePlaintext(true)
          .build()

    我们将在后面的讨论里介绍gRPC的streaming编程方法。下面是本次示范的源代码:

    project/scalapb.sbt

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    libraryDependencies += "com.thesamet.scalapb" %% "compilerplugin" % "0.7.1"

    build.sbt

    import scalapb.compiler.Version.scalapbVersion
    import scalapb.compiler.Version.grpcJavaVersion
    name := "learn-gRPC"
    version := "0.1"
    scalaVersion := "2.12.6"
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
      "io.grpc" % "grpc-netty" % grpcJavaVersion,
      "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion
    )
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value
    )

    src/main/protobuf/hello.proto

    syntax = "proto3";
    
    import "google/protobuf/wrappers.proto";
    import "scalapb/scalapb.proto";
    
    package learn.grpc.services;
    
    /*
     * Returns a greeting for the given person optionally including a custom message.
     */
    service HelloWorld {
      rpc SayHello(ToBeGreeted) returns (Greeting) {}
    }
    
    message Person {
       string name = 1;
    }
    
    message ToBeGreeted {
      Person person = 1;
      google.protobuf.StringValue msg = 2;
    }
    
    message Greeting {
      string message = 1;
    }

    src/main/scala/gRPCServer.scala

    package learn.grpc.server
    import io.grpc.{ServerBuilder,ServerServiceDefinition}
    
    trait gRPCServer {
      def runServer(service: ServerServiceDefinition): Unit = {
        val server = ServerBuilder
          .forPort(50051)
          .addService(service)
          .build
          .start
    
        // make sure our server is stopped when jvm is shut down
        Runtime.getRuntime.addShutdownHook(new Thread() {
          override def run(): Unit = server.shutdown()
        })
    
        server.awaitTermination()
      }
    
    }

    src/main/scala/HelloServer.scala

    package learn.grpc.hello.server
    import learn.grpc.services.hello._
    import learn.grpc.server.gRPCServer
    import scala.concurrent._
    
    object HelloServer extends gRPCServer {
    
      class HelloService extends HelloWorldGrpc.HelloWorld {
        override def sayHello(request: ToBeGreeted): Future[Greeting] = {
          val greeter = request.person match {
            case Some(p) => p.name
            case None => "friendo"
          }
          Future.successful(Greeting(message = s"Hello $greeter, ${request.msg}"))
        }
      }
    
      def main(args: Array[String]): Unit = {
        val service = HelloWorldGrpc.bindService(new HelloService,ExecutionContext.global)
        runServer(service)
      }
    }

    src/main/scala/HelloClient.scala

    package learn.grpc.hello.client
    import learn.grpc.services.hello.ToBeGreeted.Person
    import learn.grpc.services.hello._
    
    import scala.concurrent.Future
    object HelloClient {
      def main(args: Array[String]): Unit = {
    
        //build connection channel
        val channel = io.grpc.ManagedChannelBuilder
          .forAddress("LocalHost",50051)
            .usePlaintext(true)
          .build()
    
        //construct requestHelloService
        val greeter = ToBeGreeted()
          .withMsg("remote greetings!")
          .withPerson(ToBeGreeted.Person("mickey"))
    
        //async call
        val asyncStub: HelloWorldGrpc.HelloWorldStub = HelloWorldGrpc.stub(channel)
        val futResponse: Future[Greeting] = asyncStub.sayHello(greeter)
    
        import scala.concurrent.ExecutionContext.Implicits.global
        futResponse.foreach(greeting => println(greeting.message))
    
        val greeter2 = ToBeGreeted(person = Some(Person("jacky")),msg = Some("how are you?"))
        //sync call
        val syncStub: HelloWorldGrpc.HelloWorldBlockingClient = HelloWorldGrpc.blockingStub(channel)
        val response: Greeting = syncStub.sayHello(greeter2)
    
        println(s"${response.message}")
    
    
      }
    
    }

     

  • 相关阅读:
    志愿者招募 [NOI2008] [鬼畜网络流]
    莫队入门
    分块入门
    高速公路 [HAOI2012] [线段树]
    游历校园 [COGS 614] [欧拉图]
    网络吞吐量 [CQOI2015] [网络流]
    LeetCode 27. Remove Element
    LeetCode 26. Remove Duplicates from Sorted Array
    LeetCode 21. Merge Two Sorted Lists
    LeetCode 20. Valid Parentheses
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/9013978.html
Copyright © 2011-2022 走看看