zoukankan      html  css  js  c++  java
  • grpc详解 java版

    grpc 详解 java版

    Java中gRPC的基本教程介绍。

    本教程提供了有关使用gRPC的基本Java程序员介绍。

    通过遍历此示例,您将学习如何:

    • grpc设计的核心概念。
    • 在.proto文件中定义服务。
    • 使用协议缓冲区编译器生成服务器和客户端代码。
    • 使用Java gRPC API为您的服务编写一个简单的客户端和服务器。

    假定您已经阅读了gRPC简介并且熟悉协议缓冲区。请注意,本教程中的示例使用了 proto3协议缓冲区语言的版本:您可以在proto3语言指南中找到更多信息和Java生成的代码指南

    为什么要使用gRPC?

    我们的示例是一个简单的路由映射应用程序,它使客户端可以获取有关其路由功能的信息,创建其路由的摘要以及与服务器和其他客户端交换路由信息(例如流量更新)。

    借助gRPC,我们可以在一个.proto文件中定义一次服务,并以gRPC支持的任何语言生成客户端和服务器,而这又可以在从大型数据中心内的服务器到您自己的平板电脑的各种环境中运行– gRPC为您处理不同的语言和环境。我们还获得了使用协议缓冲区的所有优点,包括有效的序列化,简单的IDL和轻松的接口更新。

    核心概念

    GRPC包含三个不同的层:StubChannelTransport

    Stu

    Stub层是大多数开发人员都可以接触到的,它为您要适应的任何数据模型/ IDL /接口提供类型安全的绑定。gRPC带有协议缓冲区编译器的插件,该插件可以从.proto文件中生成Stub接口,但是与其他数据模型/ IDL的绑定是容易的并且值得鼓励。

    Channel

    通道层是传输处理上的抽象,适合于侦听/装饰,并且比存根(Stub)层向应用程序公开更多的行为。对于应用程序框架而言,使用该层来解决诸如日志记录,监视,身份验证等跨领域的问题很容易。

    Transport

    传输层不费吹灰之力地将字节从导线中取出。它的接口是抽象的,足以允许插入不同的实现。请注意,传输层API被认为是gRPC内部的,并且与包中的核心API相比,其API保证较弱io.grpc

    gRPC带有三种传输实现:

    1. 基于Netty的传输是基于Netty的主要传输实现 。它适用于客户端和服务器。
    2. 基于OkHttp的传输是基于OkHttp的轻量级传输 。它主要用于Android,并且仅用于客户端。
    3. 进程内传输适用于服务器与客户端处于同一进程中的情况。它对于测试很有用,同时也可以安全地用于生产。

    示例代码和设置

    本教程的示例代码在 grpc / grpc-java / examples / src / main / java / io / grpc / examples / routeguide中。要下载示例,请grpc-java通过运行以下命令在存储库中克隆最新版本:

    $ git clone -b v1.34.0 https://github.com/grpc/grpc-java.git
    

    然后将当前目录更改为grpc-java/examples

    $ cd grpc-java/examples
    

    定义服务

    我们的第一步(如您从gRPC简介中所知道的)是使用协议缓冲区定义gRPC服务以及方法请求响应类型。 。您可以在grpc-java / examples / src / main / proto / route_guide.proto中看到完整的.proto文件。 。

    在此示例中生成Java代码时,我们java_package 在.proto中指定了一个文件选项:

    option java_package = "io.grpc.examples.routeguide";
    

    这指定了我们要用于生成的Java类的包。如果java_package.proto文件中未提供显式选项,则默认情况下将使用proto软件包(使用“ package”关键字指定)。但是,proto软件包通常不能成为良好的Java包,因为proto软件包不应以反向域名开头。如果我们从此.proto生成另一种语言的代码,则该java_package选项无效。

    要定义服务,我们service在.proto文件中指定一个名称:

    service RouteGuide {
       ...
    }
    

    然后,rpc在服务定义中定义方法,并指定其请求和响应类型。gRPC允许您定义四种服务方法,所有这些方法都在RouteGuide服务中使用:

    • 一个简单的RPC,客户端使用存根将请求发送到服务器,然后等待响应返回,就像正常的函数调用一样。

      // Obtains the feature at a given position.
      rpc GetFeature(Point) returns (Feature) {}
      
    • 一个服务器端流RPC,其中客户端发送请求到服务器,并获得一个流中读取消息的序列后面。客户端从返回的流中读取,直到没有更多消息为止。如我们的示例所示,您可以通过stream响应类型之前放置关键字来指定服务器端流方法。

      // Obtains the Features available within the given Rectangle.  Results are
      // streamed rather than returned at once (e.g. in a response message with a
      // repeated field), as the rectangle may cover a large area and contain a
      // huge number of features.
      rpc ListFeatures(Rectangle) returns (stream Feature) {}
      
    • 一个客户端流传输的RPC,其中客户端将消息写入Point序列,并且将它们发送到服务器,再次使用提供的流。客户端写完消息后,它将等待服务器读取所有消息并返回其响应。您可以通过将stream关键字放在请求类型之前来指定客户端流方法。

      // Accepts a stream of Points on a route being traversed, returning a
      // RouteSummary when traversal is completed.
      rpc RecordRoute(stream Point) returns (RouteSummary) {}
      
    • 一个双向流RPC双方都派出使用读写流的消息序列。这两个流是独立运行的,因此客户端和服务器可以按照自己喜欢的顺序进行读写:例如,服务器可以在写响应之前等待接收所有客户端消息,或者可以先读取一条消息再写入一条消息,或其他一些读写组合。每个流中的消息顺序都会保留。您可以通过stream 在请求和响应之前放置关键字来指定这种类型的方法。

      // Accepts a stream of RouteNotes sent while a route is being traversed,
      // while receiving other RouteNotes (e.g. from other users).
      rpc RouteChat(stream RouteNote) returns (stream RouteNote) {}
      

    我们的.proto文件还包含用于服务方法中所有请求和响应类型的协议缓冲区消息类型定义-例如,以下是Point消息类型:

    // Points are represented as latitude-longitude pairs in the E7 representation
    // (degrees multiplied by 10**7 and rounded to the nearest integer).
    // Latitudes should be in the range +/- 90 degrees and longitude should be in
    // the range +/- 180 degrees (inclusive).
    message Point {
      int32 latitude = 1;
      int32 longitude = 2;
    }
    

    生成客户端和服务器代码

    接下来,我们需要根据.proto服务定义生成gRPC客户端和服务器接口。我们使用protoc带有特殊gRPC Java插件的协议缓冲区编译器进行此操作。您需要使用 proto3 编译器(同时支持proto2和proto3语法)以生成gRPC服务。

    使用Gradle或Maven时,protoc构建插件可以生成必要的代码作为构建的一部分。您可以参考grpc-java自述文件有关如何从您自己的.proto文件生成代码的信息。

    以下类是根据我们的服务定义生成的:

    • Feature.javaPoint.javaRectangle.java,和其他含有的所有协议缓存代码来填充,序列化,并检索我们的请求和响应消息的类型。

    • RouteGuideGrpc.java
      

      其中包含(以及一些其他有用的代码):

      • RouteGuide服务器要实现 的基类RouteGuideGrpc.RouteGuideImplBase,其中包含RouteGuide服务中定义的所有方法 。
      • 客户端可以用来与RouteGuide服务器对话的存根类。

    创建服务器

    首先让我们看一下如何创建RouteGuide服务器。如果您只对创建gRPC客户端感兴趣,则可以跳过本节,直接进入创建客户端(尽管您可能仍然会发现它很有趣!)。

    使我们的RouteGuide服务发挥作用有两个部分:

    • 覆盖根据我们的服务定义生成的服务基类:完成我们服务的实际“工作”。
    • 运行gRPC服务器以侦听来自客户端的请求并返回服务响应。

    您可以RouteGuidegrpc-java / examples / src / main / java / io / grpc / examples / routeguide / RouteGuideServer.java中找到我们的示例服务器。 。让我们仔细看看它是如何工作的。

    实施RouteGuide

    如您所见,我们的服务器有一个RouteGuideService扩展生成的RouteGuideGrpc.RouteGuideImplBase抽象类的类:

    private static class RouteGuideService extends RouteGuideGrpc.RouteGuideImplBase {
    ...
    }
    

    简单的RPC

    RouteGuideService实现我们所有的服务方法。首先让我们看一下最简单的方法GetFeature(),该方法Point仅从客户端获取a ,并从其数据库中的a中返回相应的特征信息Feature

    @Override
    public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
      responseObserver.onNext(checkFeature(request));
      responseObserver.onCompleted();
    }
    
    ...
    
    private Feature checkFeature(Point location) {
      for (Feature feature : features) {
        if (feature.getLocation().getLatitude() == location.getLatitude()
            && feature.getLocation().getLongitude() == location.getLongitude()) {
          return feature;
        }
      }
    
      // No feature was found, return an unnamed feature.
      return Feature.newBuilder().setName("").setLocation(location).build();
    }
    

    getFeature()方法有两个参数:

    • Point:请求
    • StreamObserver<Feature>:响应观察者,这是服务器用来调用其响应的特殊接口。

    要将我们的回复返回给客户并完成通话,请执行以下操作:

    1. Feature根据服务定义中的说明,我们构造并填充一个响应对象以返回到客户端。在此示例中,我们使用单独的私有checkFeature()方法进行此操作。
    2. 我们使用响应观察者的onNext()方法返回Feature
    3. 我们使用响应观察者的onCompleted()方法来指定我们已经完成了对RPC的处理。

    服务器端流式RPC

    接下来,让我们看一下我们的流式RPC。ListFeatures是服务器端的流式RPC,因此我们需要将多个Features发送回客户端。

    private final Collection<Feature> features;
    
    ...
    
    @Override
    public void listFeatures(Rectangle request, StreamObserver<Feature> responseObserver) {
      int left = min(request.getLo().getLongitude(), request.getHi().getLongitude());
      int right = max(request.getLo().getLongitude(), request.getHi().getLongitude());
      int top = max(request.getLo().getLatitude(), request.getHi().getLatitude());
      int bottom = min(request.getLo().getLatitude(), request.getHi().getLatitude());
    
      for (Feature feature : features) {
        if (!RouteGuideUtil.exists(feature)) {
          continue;
        }
    
        int lat = feature.getLocation().getLatitude();
        int lon = feature.getLocation().getLongitude();
        if (lon >= left && lon <= right && lat >= bottom && lat <= top) {
          responseObserver.onNext(feature);
        }
      }
      responseObserver.onCompleted();
    }
    

    像简单的RPC一样,此方法获取一个请求对象(Rectangle客户端希望在其中找到Feature)和一个StreamObserver响应观察者。

    这次,我们获得Feature返回到客户端所需的尽可能多的对象(在这种情况下,我们根据它们是否在我们的请求中从服务的功能集中选择它们Rectangle),并将每个对象依次写入响应观察器。使用其onNext()方法。最后,就像在简单的RPC中一样,我们使用响应观察者的onCompleted()方法来告诉gRPC我们已经完成了响应的编写。

    客户端流式RPC

    现在,让我们看一些更复杂的东西:客户端流方法RecordRoute()Point从客户端获取s的流,并返回RouteSummary包含有关其行程的信息的单个流。

    @Override
    public StreamObserver<Point> recordRoute(final StreamObserver<RouteSummary> responseObserver) {
      return new StreamObserver<Point>() {
        int pointCount;
        int featureCount;
        int distance;
        Point previous;
        long startTime = System.nanoTime();
    
        @Override
        public void onNext(Point point) {
          pointCount++;
          if (RouteGuideUtil.exists(checkFeature(point))) {
            featureCount++;
          }
          // For each point after the first, add the incremental distance from the previous point
          // to the total distance value.
          if (previous != null) {
            distance += calcDistance(previous, point);
          }
          previous = point;
        }
    
        @Override
        public void onError(Throwable t) {
          logger.log(Level.WARNING, "Encountered error in recordRoute", t);
        }
    
        @Override
        public void onCompleted() {
          long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
          responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
              .setFeatureCount(featureCount).setDistance(distance)
              .setElapsedTime((int) seconds).build());
          responseObserver.onCompleted();
        }
      };
    }
    

    如您所见,像以前的方法类型一样,我们的方法获得一个 StreamObserver响应观察者参数,但是这次它返回a StreamObserver供客户端编写其Point

    在方法主体中,我们实例化了一个匿名对象StreamObserver以返回,其中:

    • onNext()每次客户端将a写入Point消息流时,都应重写该方法以获取功能和其他信息。
    • 覆盖此onCompleted()方法(在客户端完成写消息时调用)以填充并构建我们的RouteSummary。然后,我们调用我们的方法本身的响应观察者的onNext()我们RouteSummary,然后调用它的onCompleted()方法来完成从服务器端调用。

    双向流式RPC

    最后,让我们看一下双向流式RPC RouteChat()

    @Override
    public StreamObserver<RouteNote> routeChat(final StreamObserver<RouteNote> responseObserver) {
      return new StreamObserver<RouteNote>() {
        @Override
        public void onNext(RouteNote note) {
          List<RouteNote> notes = getOrCreateNotes(note.getLocation());
    
          // Respond with all previous notes at this location.
          for (RouteNote prevNote : notes.toArray(new RouteNote[0])) {
            responseObserver.onNext(prevNote);
          }
    
          // Now add the new note to the list
          notes.add(note);
        }
    
        @Override
        public void onError(Throwable t) {
          logger.log(Level.WARNING, "Encountered error in routeChat", t);
        }
    
        @Override
        public void onCompleted() {
          responseObserver.onCompleted();
        }
      };
    }
    

    与我们的客户端流示例一样,我们都获取并返回了 StreamObserver响应观察器,只是这次我们在客户端仍将消息写入 消息流时通过方法的响应观察器返回值。此处读取和写入的语法与我们的客户端流和服务器流方法完全相同。尽管双方总是会按照对方的写入顺序获得对方的消息,但是客户端和服务器都可以按照任何顺序进行读取和写入-流完全独立地运行。

    启动服务器

    一旦实现了所有方法,我们还需要启动gRPC服务器,以便客户端可以实际使用我们的服务。以下代码段显示了我们如何为RouteGuide服务执行此操作:

    public RouteGuideServer(int port, URL featureFile) throws IOException {
      this(ServerBuilder.forPort(port), port, RouteGuideUtil.parseFeatures(featureFile));
    }
    
    /** Create a RouteGuide server using serverBuilder as a base and features as data. */
    public RouteGuideServer(ServerBuilder<?> serverBuilder, int port, Collection<Feature> features) {
      this.port = port;
      server = serverBuilder.addService(new RouteGuideService(features))
          .build();
    }
    ...
    public void start() throws IOException {
      server.start();
      logger.info("Server started, listening on " + port);
     ...
    }
    

    如您所见,我们使用构建和启动服务器ServerBuilder

    为此,我们:

    1. 使用构建器的forPort()方法指定我们要用于侦听客户端请求的地址和端口。
    2. 创建我们的服务实现类的实例,RouteGuideService 并将其传递给构建器的addService()方法。
    3. 调用build()start()在构建器上为我们的服务创建并启动RPC服务器。

    创建客户端

    在本节中,我们将研究为我们的RouteGuide 服务创建一个客户端。您可以在grpc-java / examples / src / main / java / io / grpc / examples / routeguide / RouteGuideClient.java中看到我们完整的示例客户端代码。

    Instantiating a stub

    要调用服务方法,我们首先需要创建一个存根,或者说,创建两个存根:

    • 阻塞/同步存根:这意味着该RPC调用等待服务器响应,并且将或者返回一个响应或抛出异常。
    • 一个非阻塞/异步存根,它对服务器进行非阻塞调用,在该服务器上异步返回响应。您只能使用异步存根进行某些类型的流式调用。

    首先,我们需要为存根创建一个gRPC通道,指定我们要连接的服务器地址和端口:

    public RouteGuideClient(String host, int port) {
      this(ManagedChannelBuilder.forAddress(host, port).usePlaintext());
    }
    
    /** Construct client for accessing RouteGuide server using the existing channel. */
    public RouteGuideClient(ManagedChannelBuilder<?> channelBuilder) {
      channel = channelBuilder.build();
      blockingStub = RouteGuideGrpc.newBlockingStub(channel);
      asyncStub = RouteGuideGrpc.newStub(channel);
    }
    

    我们使用ManagedChannelBuilder创建 channel。

    现在,我们可以使用channel创建stub,method是使用从.proto生成的RouteGuideGrpc类中提供的newStub和newBlockingStub方法。

    blockingStub = RouteGuideGrpc.newBlockingStub(channel);
    asyncStub = RouteGuideGrpc.newStub(channel);
    

    Calling service method

    现在让我们看看我们如何调用我们的服务方法。

    简单的RPC

    GetFeature在阻塞存根上调用简单的RPC与调用本地方法一样简单。

    Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();
    Feature feature;
    try {
      feature = blockingStub.getFeature(request);
    } catch (StatusRuntimeException e) {
      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      return;
    }
    

    我们创建并填充一个请求协议缓冲区对象(在本例中为Point),将其传递给getFeature()阻塞存根上的方法,然后返回一个 Feature

    如果发生错误,则将其编码为Status,我们可以从获取 StatusRuntimeException

    服务器端流式RPC

    接下来,让我们看一下服务器端对的流式调用ListFeatures,该调用返回一个geo的流Feature

    Rectangle request =
        Rectangle.newBuilder()
            .setLo(Point.newBuilder().setLatitude(lowLat).setLongitude(lowLon).build())
            .setHi(Point.newBuilder().setLatitude(hiLat).setLongitude(hiLon).build()).build();
    Iterator<Feature> features;
    try {
      features = blockingStub.listFeatures(request);
    } catch (StatusRuntimeException ex) {
      logger.log(Level.WARNING, "RPC failed: {0}", e.getStatus());
      return;
    }
    

    如您所见,它与我们刚刚看过的简单RPC非常相似,除了返回一个Feature,该方法返回一个Iterator,客户端可以用来读取所有返回的Features ,而不是返回单个。

    客户端流式RPC

    现在,让事情变得更复杂一些:客户端流方法 RecordRoute,我们将Points流发送到服务器并返回一个RouteSummary。对于这种方法,我们需要使用异步存根。如果您已经阅读了“创建服务器”,那么其中的一些内容可能看起来非常熟悉-异步流式RPC在两侧都以类似的方式实现。

    public void recordRoute(List<Feature> features, int numPoints) throws InterruptedException {
      info("*** RecordRoute");
      final CountDownLatch finishLatch = new CountDownLatch(1);
      StreamObserver<RouteSummary> responseObserver = new StreamObserver<RouteSummary>() {
        @Override
        public void onNext(RouteSummary summary) {
          info("Finished trip with {0} points. Passed {1} features. "
              + "Travelled {2} meters. It took {3} seconds.", summary.getPointCount(),
              summary.getFeatureCount(), summary.getDistance(), summary.getElapsedTime());
        }
    
        @Override
        public void onError(Throwable t) {
          Status status = Status.fromThrowable(t);
          logger.log(Level.WARNING, "RecordRoute Failed: {0}", status);
          finishLatch.countDown();
        }
    
        @Override
        public void onCompleted() {
          info("Finished RecordRoute");
          finishLatch.countDown();
        }
      };
    
      StreamObserver<Point> requestObserver = asyncStub.recordRoute(responseObserver);
      try {
        // Send numPoints points randomly selected from the features list.
        Random rand = new Random();
        for (int i = 0; i < numPoints; ++i) {
          int index = rand.nextInt(features.size());
          Point point = features.get(index).getLocation();
          info("Visiting point {0}, {1}", RouteGuideUtil.getLatitude(point),
              RouteGuideUtil.getLongitude(point));
          requestObserver.onNext(point);
          // Sleep for a bit before sending the next one.
          Thread.sleep(rand.nextInt(1000) + 500);
          if (finishLatch.getCount() == 0) {
            // RPC completed or errored before we finished sending.
            // Sending further requests won't error, but they will just be thrown away.
            return;
          }
        }
      } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
      }
      // Mark the end of requests
      requestObserver.onCompleted();
    
      // Receiving happens asynchronously
      finishLatch.await(1, TimeUnit.MINUTES);
    }
    

    如您所见,要调用此方法,我们需要创建一个StreamObserver,实现一个特殊的接口供服务器使用其RouteSummary 响应进行调用。在我们StreamObserver我们:

    • onNext()当服务器将a写入RouteSummary消息流时,重写用于打印返回的信息的方法。
    • 重写该onCompleted()方法(在服务器完成其侧面的调用时调用)以减小a CountDownLatch,我们可以检查该方法以查看服务器是否已完成写入。

    然后,将传递StreamObserver给异步存根的recordRoute() 方法,并返回我们自己的StreamObserver请求观察器,以编写 Points发送给服务器。一旦完成编写点,就使用请求观察者的onCompleted()方法来告诉gRPC我们已经完成了在客户端的编写。完成后,我们CountDownLatch将检查服务器端是否已完成。

    双向流式RPC

    最后,让我们看一下双向流式RPC RouteChat()

    public void routeChat() throws Exception {
      info("*** RoutChat");
      final CountDownLatch finishLatch = new CountDownLatch(1);
      StreamObserver<RouteNote> requestObserver =
          asyncStub.routeChat(new StreamObserver<RouteNote>() {
            @Override
            public void onNext(RouteNote note) {
              info("Got message "{0}" at {1}, {2}", note.getMessage(), note.getLocation()
                  .getLatitude(), note.getLocation().getLongitude());
            }
    
            @Override
            public void onError(Throwable t) {
              Status status = Status.fromThrowable(t);
              logger.log(Level.WARNING, "RouteChat Failed: {0}", status);
              finishLatch.countDown();
            }
    
            @Override
            public void onCompleted() {
              info("Finished RouteChat");
              finishLatch.countDown();
            }
          });
    
      try {
        RouteNote[] requests =
            {newNote("First message", 0, 0), newNote("Second message", 0, 1),
                newNote("Third message", 1, 0), newNote("Fourth message", 1, 1)};
    
        for (RouteNote request : requests) {
          info("Sending message "{0}" at {1}, {2}", request.getMessage(), request.getLocation()
              .getLatitude(), request.getLocation().getLongitude());
          requestObserver.onNext(request);
        }
      } catch (RuntimeException e) {
        // Cancel RPC
        requestObserver.onError(e);
        throw e;
      }
      // Mark the end of requests
      requestObserver.onCompleted();
    
      // Receiving happens asynchronously
      finishLatch.await(1, TimeUnit.MINUTES);
    }
    

    正如我们的客户端流的例子,我们都get和返回 StreamObserver响应的观察者,但这次我们通过我们的方法的反应派观察员值,而服务器还在写邮件给他们的 消息流。此处读取和写入的语法与我们的客户端流方法完全相同。尽管双方总是会按照对方的写入顺序获得对方的消息,但是客户端和服务器都可以按照任何顺序进行读取和写入-流完全独立地运行。

    试试看!

    按照示例目录自述文件中的说明进行操作 构建并运行客户端和服务器。

    参考链接

    https://grpc.io/docs/languages/java/quickstart/
    https://github.com/grpc/grpc-java/blob/master/README.md
    如果觉得写得不好,欢迎指出;如果觉得写得不错,欢迎亲们赞赏。
    对卡卡的赞赏

  • 相关阅读:
    常用html设置:
    Java Enum
    ajax
    Utils使用
    jdk免安装对应配置
    jdk mvn下载--操作系统
    SpringMvc 文件上传后台处理
    SpringMvc 获取ApplicationContext
    Jenkins 持续集成
    自定义 directive pagination
  • 原文地址:https://www.cnblogs.com/O-ll-O/p/14080769.html
Copyright © 2011-2022 走看看