zoukankan      html  css  js  c++  java
  • java grpc 简单易懂 ---2

     

     

     

     

     

    欢迎回来!

    2.请求流接口

    (客户端可以源源不断的给服务端传参数,服务端会源源不断的接受服务端的参数,最后在客户端完成请求的时候,服务端返回一个结果)

    在.proto文件中新加一个方法,这个方法的参数被 stream 关键字修饰

    rpc methodRequestStream(stream Request) returns (Result) {}
    

      

    然后用maven,清理一下缓存,重新编译一下

    2.1.服务端

     重新编译之后,实现刚刚新加的方法

        @Override
        public StreamObserver<Request> methodRequestStream(StreamObserver<Result> responseObserver) {
            return new StreamObserver<Request>() {
                @Override
                public void onNext(Request request) {
                    System.out.print("收到了请求 
    ");
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onCompleted() {
                    Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
                    responseObserver.onNext(result);
                    responseObserver.onCompleted();
                }
            };
        }
    

      

    (友情提示,如果 StreamObserver  的的泛型是Result 我们就叫 返回流观察者,如果是 Request 就叫请求流观察者,这样好描述一些)

    这个和普通的有点不一样,直接返回了一个 请求流观察者 的接口实现,而且方法的参数还是一个 返回流观察者 ,好像搞反了一样,至于为什么,一会在客户端那里 统一说

    2.2.客户端

    请求流式异步调用,普通的是同步调用,我们在普通的方法里创建的实例 也是同步的,所以我们要在 JavaGrpcClient 中新加一个 异步调用的方法,添加一个异步的实例

    public <Result> Result runAsync(Functional<TestServiceGrpc.TestServiceStub,Result> functional)
        {
            TestServiceGrpc.TestServiceStub testServiceStub =
                    TestServiceGrpc.newStub(channel);
    
            return functional.run(testServiceStub);
        }

    TestServiceGrpc.newStub 返回的是一个异步的实例

    再加一个测试

    @Test
        public void contextLoads2() {
            Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
            StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
                @Override
                public void onNext(Result result) {
                    System.out.print("返回了结果 
    ");
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onCompleted() {
    
                }
            };
            StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
            result.onNext(request);
            result.onNext(request);
            result.onNext(request);
            result.onCompleted();
    
            try {
                Thread.sleep(600000);
            }
            catch (Exception ex){}
        }
    

      

    这里我们实现了一个 返回流观察者 

    StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
                @Override
                public void onNext(Result result) {
                    System.out.print("返回了结果 
    ");
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onCompleted() {
    
                }
            };
    

      

    调用方法的时候,将我们实现的 返回流观察者 传进去,返回给我们一个 请求流观察者

    StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodRequestStream(responseObserver));
    

      

    其实这里返回的 请求流观察者 就是服务端那里返回给我们的内个实现,服务端那里 返回流观察者 是我们实现的 传给他的

    由于是异步调用,最后暂停一下,要不测试跑完,程序结束 开没开始就结束了

    try {
        Thread.sleep(600000);
    }
    catch (Exception ex){}
    

      

    运行起来看结果

    服务端的打印

    客户端的打印

    这里我们发送了三次参数过去

    result.onNext(request);
    result.onNext(request);
    result.onNext(request);
    

      

    就相当于 服务端 那边返回的 请求流观察者 被调用了 三次 ,所以就打印了三句话

    发送完参数结束请求

    result.onCompleted();
    

      

    服务端那里的结束请求中调用了一次我们传给他的 返回流观察者 中的 onNext 方法

    所以客户端就打印了一次

    这里会有人问 这里不能返回 多个吗

    不能,虽然 这两个观察者 看上去一样 都是 StreamObserver 接口,但是,这个方法只是请求流调用,在grpc的内部 最后返回的时候 只返回第一个指定的返回只,不管返回了多少个,在客户端那边只会收到 第一个返回的结果

    3.响应流接口

    (和请求流接口完全相反,请求流是异步,响应流是同步,请求流是接受多个请求返回一个结果,响应流是接受一个请求返回多个结果)

     我们在.proto文件中再增加一个方法,这回这个方法的返回值被 stream 关键字修饰

    rpc methodResultStream(Request) returns (stream Result){}
    

      

    清缓存,重新编译

    3.1.服务端

     实现刚刚新加的方法

    @Override
        public void methodResultStream(Request request, StreamObserver<Result> responseObserver) {
            System.out.print("收到了请求 
    ");
            Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
            responseObserver.onNext(result);
            responseObserver.onNext(result);
            try {
                Thread.sleep(2000);
            }
            catch (Exception ex){}
            responseObserver.onNext(result);
            responseObserver.onCompleted();
        }
    

      

     

    这里跟普通的差不多,只是我们返回了三次结果

    responseObserver.onNext(result);
    responseObserver.onNext(result);
    try {
        Thread.sleep(2000);
    }
    catch (Exception ex){}
    responseObserver.onNext(result);
    

      

    3.2.客户端

    没啥好加的了,直接上测试

    @Test
        public void contextLoads3() {
            Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
            Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));
    
            result.forEachRemaining(o ->
            {
                System.out.print("返回了结果 
    ");
            });
            System.out.print("结束 
    ");
        }
    

      

    返回流请求是同步的,所以要调同步的方法,返回了一个迭代器

    Iterator<Result> result = javaGrpcClient.run(o -> o.methodResultStream(request));
    

      

    迭代器中有服务端的所有返回结果

    result.forEachRemaining(o ->
    {
        System.out.print("返回了结果 
    ");
    });
    

      

    运行结果

    服务端结果

    客户端结果

    由于是同步调用,在forEach中会等待服务端的每一个返回结果

    4.双向流接口

     --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    歇会,抽根烟!

    --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

    在.proto文件中再加一个方法

    rpc methodDoubleStream(stream Request) returns (stream Result){}
    

      

    实现

    双向流的服务端和请求流的没啥区别,只是在接收到请求的时候没有立刻结束请求

    @Override
        public StreamObserver<Request> methodDoubleStream(StreamObserver<Result> responseObserver) {
            return new StreamObserver<Request>() {
                @Override
                public void onNext(Request value) {
                    System.out.print("收到了请求 
    ");
                    Result result = Result.newBuilder().setResult1("result1").setResult2("result2").build();
                    responseObserver.onNext(result);
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    

      

    客户端也没啥区别

    @Test
        public void contextLoads4() {
            Request request = Request.newBuilder().setRequest1("test1").setRequest2("test2").build();
            StreamObserver<Result> responseObserver = new StreamObserver<Result>() {
                @Override
                public void onNext(Result result) {
                    System.out.print("返回了结果 
    ");
                }
    
                @Override
                public void onError(Throwable throwable) {
    
                }
    
                @Override
                public void onCompleted() {
    
                }
            };
            StreamObserver<Request> result = javaGrpcClient.runAsync(o -> o.methodDoubleStream(responseObserver));
            result.onNext(request);
            result.onNext(request);
            result.onNext(request);
            result.onCompleted();
    
            try {
                Thread.sleep(600000);
            }
            catch (Exception ex){}
        }
    

      

    双向流也是异步的,所以要等待

    try {
        Thread.sleep(600000);
    }
    catch (Exception ex){}
    

      

     服务端结果

    客户端结果

    完结!撒花!

  • 相关阅读:
    SecureCRT
    Jsoup 标签选择器 选择img标签中src的值
    使用Jsoup 爬取网易首页所有的图片
    java自定义类型 比较排序 Comparator接口
    eclipse下导入jdk源码
    java爬虫--使用正则表达式获取网页中的email
    Java正则表达式--Matcher.group函数的用法
    使用org.jsoup.Jsoup下载网络中的图片
    Tomcat中的Session小结
    关于JAVA中的static方法、并发问题以及JAVA运行时内存模型
  • 原文地址:https://www.cnblogs.com/gutousu/p/9970288.html
Copyright © 2011-2022 走看看