zoukankan      html  css  js  c++  java
  • java版gRPC实战之三:服务端流

    欢迎访问我的GitHub

    https://github.com/zq2599/blog_demos

    内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

    《java版gRPC实战》全系列链接

    1. 用proto生成代码
    2. 服务发布和调用
    3. 服务端流
    4. 客户端流
    5. 双向流
    6. 客户端动态获取服务端地址
    7. 基于eureka的注册发现

    关于gRPC定义的四种类型

    本文是《java版gRPC实战》系列的第三篇,前文咱们实战体验了简单的RPC请求和响应,那种简单的请求响应方式其实只是gRPC定义的四种类型之一,这里给出《gRPC 官方文档中文版》对这四种gRPC类型的描述:

    1. 简单 RPC:客户端使用存根(stub)发送请求到服务器并等待响应返回,就像平常的函数调用一样;
    2. 服务器端流式 RPC:客户端发送请求到服务器,拿到一个流去读取返回的消息序列。 客户端读取返回的流,直到里面没有任何消息;(即本篇内容)
    3. 客户端流式 RPC:客户端写入一个消息序列并将其发送到服务器,同样也是使用流。一旦 客户端完成写入消息,它等待服务器完成读取返回它的响应;
    4. 双向流式 RPC:是双方使用读写流去发送一个消息序列。两个流独立操作,因此客户端和服务器 可以以任意喜欢的顺序读写:比如, 服务器可以在写入响应前等待接收所有的客户端消息,或者可以交替 的读取和写入消息,或者其他读写的组合。 每个流中的消息顺序被预留;

    本篇概览

    本篇是服务端流类型的gRPC服务实战,包括以下内容:

    1. 开发一个gRPC服务,类型是服务端流;
    2. 开发一个客户端,调用前面发布的gRPC服务;
    3. 验证;
    • 不多说了,开始上代码;

    源码下载

    名称 链接 备注
    项目主页 https://github.com/zq2599/blog_demos 该项目在GitHub上的主页
    git仓库地址(https) https://github.com/zq2599/blog_demos.git 该项目源码的仓库地址,https协议
    git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
    • 这个git项目中有多个文件夹,《java版gRPC实战》系列的源码在grpc-tutorials文件夹下,如下图红框所示:

    在这里插入图片描述

    • grpc-tutorials文件夹下有多个目录,本篇文章对应的服务端代码在server-stream-server-side目录下,客户端代码在server-stream-client-side目录下,如下图:

    在这里插入图片描述

    开发一个gRPC服务,类型是服务端流

    • 首先要开发的是gRPC服务端,一共要做下图所示的七件事:

    在这里插入图片描述

    • 打开grpc-lib模块,在src/main/proto目录下新增文件mall.proto,里面定一个了一个gRPC方法ListOrders及其入参和返回对象,内容如下,要注意的是返回值要用关键字stream修饰,表示该接口类型是服务端流:
    syntax = "proto3";
    
    option java_multiple_files = true;
    // 生成java代码的package
    option java_package = "com.bolingcavalry.grpctutorials.lib";
    // 类名
    option java_outer_classname = "MallProto";
    
    // gRPC服务,这是个在线商城的订单查询服务
    service OrderQuery {
        // 服务端流式:订单列表接口,入参是买家信息,返回订单列表(用stream修饰返回值)
        rpc ListOrders (Buyer) returns (stream Order) {}
    }
    
    // 买家ID
    message Buyer {
        int32 buyerId = 1;
    }
    
    // 返回结果的数据结构
    message Order {
        // 订单ID
        int32 orderId = 1;
        // 商品ID
        int32 productId = 2;
        // 交易时间
        int64 orderTime = 3;
        // 买家备注
        string buyerRemark = 4;
    }
    
    • 双击下图红框位置的generateProto,即可根据proto生成java代码:

    在这里插入图片描述

    • 新生成的java代码如下图红框:

    在这里插入图片描述

    • 在父工程grpc-turtorials下面新建名为server-stream-server-side的模块,其build.gradle内容如下:
    // 使用springboot插件
    plugins {
        id 'org.springframework.boot'
    }
    
    dependencies {
        implementation 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter'
        // 作为gRPC服务提供方,需要用到此库
        implementation 'net.devh:grpc-server-spring-boot-starter'
        // 依赖自动生成源码的工程
        implementation project(':grpc-lib')
    }
    
    • 新建配置文件application.yml:
    spring:
      application:
        name: server-stream-server-side
    # gRPC有关的配置,这里只需要配置服务端口号
    grpc:
      server:
        port: 9899
    
    • 启动类:
    package com.bolingcavalry.grpctutorials;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ServerStreamServerSideApplication {
        public static void main(String[] args) {
            SpringApplication.run(ServerStreamServerSideApplication.class, args);
        }
    }
    
    • 接下来是最关键的gRPC服务,代码如下,可见responseObserver.onNext方法被多次调用,用以向客户端持续输出数据,最后通过responseObserver.onCompleted结束输出:
    package com.bolingcavalry.grpctutorials;
    
    import com.bolingcavalry.grpctutorials.lib.Buyer;
    import com.bolingcavalry.grpctutorials.lib.Order;
    import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
    import io.grpc.stub.StreamObserver;
    import net.devh.boot.grpc.server.service.GrpcService;
    import java.util.ArrayList;
    import java.util.List;
    
    @GrpcService
    public class GrpcServerService extends OrderQueryGrpc.OrderQueryImplBase {
    
        /**
         * mock一批数据
         * @return
         */
        private static List<Order> mockOrders(){
            List<Order> list = new ArrayList<>();
            Order.Builder builder = Order.newBuilder();
    
            for (int i = 0; i < 10; i++) {
                list.add(builder
                        .setOrderId(i)
                        .setProductId(1000+i)
                        .setOrderTime(System.currentTimeMillis()/1000)
                        .setBuyerRemark(("remark-" + i))
                        .build());
            }
    
            return list;
        }
    
        @Override
        public void listOrders(Buyer request, StreamObserver<Order> responseObserver) {
            // 持续输出到client
            for (Order order : mockOrders()) {
                responseObserver.onNext(order);
            }
            // 结束输出
            responseObserver.onCompleted();
        }
    }
    
    • 至此,服务端开发完成,咱们再开发一个springboot应用作为客户端,看看如何远程调用listOrders接口,得到responseObserver.onNext方法输出的数据;

    开发一个客户端,调用前面发布的gRPC服务

    • 客户端模块的基本功能是提供一个web接口,其内部会调用服务端的listOrders接口,将得到的数据返回给前端,如下图:

    在这里插入图片描述

    • 在父工程grpc-turtorials下面新建名为server-stream-client-side的模块,其build.gradle内容如下:
    plugins {
        id 'org.springframework.boot'
    }
    
    dependencies {
        implementation 'org.projectlombok:lombok'
        implementation 'org.springframework.boot:spring-boot-starter'
        implementation 'org.springframework.boot:spring-boot-starter-web'
        implementation 'net.devh:grpc-client-spring-boot-starter'
        implementation project(':grpc-lib')
    }
    
    • 应用配置信息application.yml内容如下,可见是端口和gRPC服务端地址的配置:
    server:
      port: 8081
    spring:
      application:
        name: server-stream-client-side
    
    grpc:
      client:
        # gRPC配置的名字,GrpcClient注解会用到
        server-stream-server-side:
          # gRPC服务端地址
          address: 'static://127.0.0.1:9899'
          enableKeepAlive: true
          keepAliveWithoutCalls: true
          negotiationType: plaintext
    
    • 服务端的listOrders接口返回的Order对象里面有很多gRPC相关的内容,不适合作为web接口的返回值,因此定义一个DispOrder类作为web接口返回值:
    package com.bolingcavalry.grpctutorials;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import java.io.Serializable;
    
    @Data
    @AllArgsConstructor
    public class DispOrder {
        private int orderId;
        private int productId;
        private String orderTime;
        private String buyerRemark;
    }
    
    • 平淡无奇的启动类:
    package com.bolingcavalry.grpctutorials;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ServerStreamClientSideApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ServerStreamClientSideApplication.class, args);
        }
    }
    
    • 重点来了,GrpcClientService.java,里面展示了如何远程调用gRPC服务的listOrders接口,可见对于服务端流类型的接口,客户端这边通过stub调用会得到Iterator类型的返回值,接下来要做的就是遍历Iterator:
    package com.bolingcavalry.grpctutorials;
    
    import com.bolingcavalry.grpctutorials.lib.Buyer;
    import com.bolingcavalry.grpctutorials.lib.Order;
    import com.bolingcavalry.grpctutorials.lib.OrderQueryGrpc;
    import io.grpc.StatusRuntimeException;
    import lombok.extern.slf4j.Slf4j;
    import net.devh.boot.grpc.client.inject.GrpcClient;
    import org.springframework.stereotype.Service;
    import java.time.Instant;
    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    import java.time.format.DateTimeFormatter;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    @Service
    @Slf4j
    public class GrpcClientService {
    
        @GrpcClient("server-stream-server-side")
        private OrderQueryGrpc.OrderQueryBlockingStub orderQueryBlockingStub;
    
        public List<DispOrder> listOrders(final String name) {
            // gRPC的请求参数
            Buyer buyer = Buyer.newBuilder().setBuyerId(101).build();
    
            // gRPC的响应
            Iterator<Order> orderIterator;
    
            // 当前方法的返回值
            List<DispOrder> orders = new ArrayList<>();
    
            // 通过stub发起远程gRPC请求
            try {
                orderIterator = orderQueryBlockingStub.listOrders(buyer);
            } catch (final StatusRuntimeException e) {
                log.error("error grpc invoke", e);
                return new ArrayList<>();
            }
    
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    
            log.info("start put order to list");
            while (orderIterator.hasNext()) {
                Order order = orderIterator.next();
    
                orders.add(new DispOrder(order.getOrderId(),
                                        order.getProductId(),
                                        // 使用DateTimeFormatter将时间戳转为字符串
                                        dtf.format(LocalDateTime.ofEpochSecond(order.getOrderTime(), 0, ZoneOffset.of("+8"))),
                                        order.getBuyerRemark()));
                log.info("");
            }
    
            log.info("end put order to list");
    
            return orders;
        }
    }
    
    • 最后做一个controller类,对外提供一个web接口,里面会调用GrpcClientService的方法:
    package com.bolingcavalry.grpctutorials;
    
    import com.bolingcavalry.grpctutorials.lib.Order;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    import java.util.List;
    
    @RestController
    public class GrpcClientController {
    
        @Autowired
        private GrpcClientService grpcClientService;
    
        @RequestMapping("/")
        public List<DispOrder> printMessage(@RequestParam(defaultValue = "will") String name) {
            return grpcClientService.listOrders(name);
        }
    }
    
    • 至此,编码完成,开始验证

    验证

    1. 启动server-stream-server-side,启动成功后会监听9989端口:

    在这里插入图片描述

    1. 启动server-stream-client-side,再在浏览器访问:http://localhost:8081/?name=Tom ,得到结果如下(firefox自动格式化json数据),可见成功地获取了gRPC的远程数据:

    在这里插入图片描述

    至此,服务端流类型的gRPC接口的开发和使用实战就完成了,接下来的章节还会继续学习另外两种类型;

    你不孤单,欣宸原创一路相伴

    1. Java系列
    2. Spring系列
    3. Docker系列
    4. kubernetes系列
    5. 数据库+中间件系列
    6. DevOps系列

    欢迎关注公众号:程序员欣宸

    微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界...
    https://github.com/zq2599/blog_demos

  • 相关阅读:
    级联操作
    深入解析http协议
    http状态码
    数据库在一对一、一对多、多对多怎么设计表关系
    [转载]C#深拷贝的方法
    TraceSource记录程序日志
    .NET 垃圾回收与内存泄漏
    DevExpress GridControl使用方法总结
    DevExpress使用技巧总结
    Oracle 错误代码小结
  • 原文地址:https://www.cnblogs.com/bolingcavalry/p/15270672.html
Copyright © 2011-2022 走看看