zoukankan      html  css  js  c++  java
  • grpc错误处理

    对于rpc调用异常处理是需要考虑的一个方面,这里介绍一下grpc中对异常的处理。注:这里的案例都是改造了之前的接口

    简单

    简单模式是一个request对应一个response。

    //服务端
    public void simpleHello(ProtoObj.Person request,
                      StreamObserver<ProtoObj.Result> responseObserver) {
        System.out.println(request.getMyName()+" calling");
    	//返回一个包装成Exception的Status来返回错误信息,如果直接使用Throwable,客户端无法获得错误信息
        responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException());
        //如果调用了onError会自动complete无需手动complete
    	//responseObserver.onCompleted();
    }
    //客户端
    @Test
    public void  simple() throws InterruptedException {
        final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
        HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);
    
        ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
        //simple
        System.out.println("---simple rpc---");
        try {
    		//try catch远程调用
            System.out.println(blockingStub.simpleHello(person).getString());
        }catch(Exception e){
    		//将异常转换为status可以得到对应的异常信息
            Status status = Status.fromThrowable(e);
            status.asException().printStackTrace();
        }
        channel.shutdown();
    }
    //输出
    ---simple rpc---
    io.grpc.StatusException: INTERNAL: error desc
    	at io.grpc.Status.asException(Status.java:548)
    	at blog.HelloClient.simple(HelloClient.java:100)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	...
    

    客户端流式

    客户端流式是多个request对应单个response。

    //服务端
    @Override
    public StreamObserver<ProtoObj.Person> clientStreamHello(
           final StreamObserver<ProtoObj.Result> responseObserver) {
       return new StreamObserver<ProtoObj.Person>(){
           private ProtoObj.Result.Builder builder=ProtoObj.Result.newBuilder();
           @Override
           public void onNext(ProtoObj.Person value) {
               builder.setString(builder.getString() +"," + value.getMyName());
           }
    
           @Override
           public void onError(Throwable t) {
               responseObserver.onError(new Exception("custom error"));
           }
    
           @Override
           public void onCompleted() {
               builder.setString("hello"+builder.getString());
    		   //返回异常
               responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException());
               //responseObserver.onNext(builder.build());
               //responseObserver.onCompleted();
           }
       };
    }
    //客户端
    @Test
    public void client() throws InterruptedException {
    
        final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
        HelloServiceGrpc.HelloServiceStub asyncStub = HelloServiceGrpc.newStub(channel);
    
        ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
    	//使用latch来判断是否结束
        final CountDownLatch latch = new CountDownLatch(1);
    
        //client side
        System.out.println("---client stream rpc---");
        StreamObserver<ProtoObj.Result> responseObserver = new StreamObserver<ProtoObj.Result>() {
            @Override
            public void onNext(ProtoObj.Result result) {
                System.out.println("client stream--" + result.getString());
            }
    
            @Override
            public void onError(Throwable t) {
                latch.countDown();
    			//处理异常
                Status status = Status.fromThrowable(t);
                status.asException().printStackTrace();
            }
    
            @Override
            public void onCompleted() {
                latch.countDown();
            }
        };
    	//发送请求
        StreamObserver<ProtoObj.Person> clientStreamObserver = asyncStub.clientStreamHello(responseObserver);
        clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World").build());
        clientStreamObserver.onNext(ProtoObj.Person.newBuilder().setMyName("World2").build());
        clientStreamObserver.onCompleted();
    	//使用latch判断是否结束调用
        if (!Uninterruptibles.awaitUninterruptibly(latch, 1, TimeUnit.SECONDS)) {
            throw new RuntimeException("timeout!");
        }
        channel.shutdown();
    
    }
    //输出
    ---client stream rpc---
    io.grpc.StatusException: INTERNAL: error desc
    	at io.grpc.Status.asException(Status.java:548)
    	at blog.HelloClient$3.onError(HelloClient.java:148)
    	at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:392)
    

    服务端流式

    服务端流式是单个request对应多个response。

    //服务端
    @Override
    public void serverStreamHello(ProtoObj.Person request,
                            StreamObserver<ProtoObj.Result> responseObserver) {
        System.out.println(request.getMyName()+" calling");
        responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello, "+request.getMyName()).build());
        responseObserver.onError(Status.INTERNAL.withDescription("error desc").asRuntimeException());
        //error之后就不能继续调用next了
    	//responseObserver.onNext(ProtoObj.Result.newBuilder().setString("hello3, "+request.getMyName()).build());
        //responseObserver.onCompleted();
    }
    //客户端
        @Test
        public void server(){
            final ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 8080).usePlaintext(true).build();
            HelloServiceGrpc.HelloServiceBlockingStub blockingStub = HelloServiceGrpc.newBlockingStub(channel);
    
            ProtoObj.Person person = ProtoObj.Person.newBuilder().setMyName("World").build();
    
            //server side
            System.out.println("---server stream rpc---");
            Iterator<ProtoObj.Result> it = blockingStub.serverStreamHello(person);
            try{
                while (it.hasNext()) {
                    System.out.print(it.next());
                }
            }catch (Exception e){
                Status status = Status.fromThrowable(e);
                status.asException().printStackTrace();
            }
            channel.shutdown();
        }
    //输出
    ---server stream rpc---
    string: "hello, World"
    io.grpc.StatusException: INTERNAL: error desc
    	at io.grpc.Status.asException(Status.java:548)
    	at blog.HelloClient.server(HelloClient.java:122)
    	...
    

    双向流式

    双向流式是多个request对应多个response,异常处理和客户端流式类似(使用异步的newStub),这里就不再赘述。

  • 相关阅读:
    我用到的存储过程
    yii2图片处理扩展yii2-imagine的使用
    yii2——自定义widget
    YII2之 Scenario
    PHP获取某月天数
    docker版wordpress
    RBAC中 permission , role, rule 的理解
    mysql开启远程连接
    windows系统和ubuntu虚拟机之间文件共享——samba
    php生成随机字符串
  • 原文地址:https://www.cnblogs.com/resentment/p/6883153.html
Copyright © 2011-2022 走看看