zoukankan      html  css  js  c++  java
  • dubbo 异步回调

    dubbo 异步回调的使用

    业务接口:

    public interface HelloService {
        String sayHello();
        void sayHi(String name);
    }

    回调服务类:

    public class CbService {
        // onreturn 函数的参数是有限定的,细节下面提及。
        public void onreturn(String str) {
            System.out.println("onreturn:" + str);
        }
        public void onthrow(Throwable ex) {
            System.out.println("onthrow");
        }
        public String oninvoke() {
            System.out.println("oninvoke");
            return null;
        }
    }

    xml 配置:(oninvoke 暂时没有配置好)

    <bean id="cbService" class="com.zhang.CbService"></bean>
    <dubbo:reference id="hello"
        loadbalance="roundrobin" check="false" interface="com.zhang.HelloService" >
        <dubbo:method name="sayHello"
            async="true"
            onreturn="cbService.onreturn"
            onthrow="cbService.onthrow">
        </dubbo:method>
    </dubbo:reference>


    1. 异步回调的源头从 FutureFilter 发起,这是一个 consumer 端的 filter,它为 Future 设置回调函数。

    @Activate(group = Constants.CONSUMER)
    public class FutureFilter implements Filter {
    
        public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
            final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);
            
            fireInvokeCallback(invoker, invocation);
            // 发起调用请求
            Result result = invoker.invoke(invocation);
            
            if (isAsync) {
                // 注册异步回调函数
                asyncCallback(invoker, invocation);
            } else {
                syncCallback(invoker, invocation, result);
            }
            return result;
        }
        
        // 为 Future 设置 callback
        private void asyncCallback(final Invoker<?> invoker, final Invocation invocation) {
            Future<?> f = RpcContext.getContext().getFuture();
            if (f instanceof FutureAdapter) {
                ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
                future.setCallback(new ResponseCallback() {
                    public void done(Object rpcResult) {
                        if (rpcResult == null){
                            logger.error(new IllegalStateException("invalid result value : null, expected "+Result.class.getName()));
                            return;
                        }
                        ///must be rpcResult
                        if (! (rpcResult instanceof Result)){
                            logger.error(new IllegalStateException("invalid result type :" + rpcResult.getClass() + ", expected "+Result.class.getName()));
                            return;
                        }
                        Result result = (Result) rpcResult;
                        if (result.hasException()) {
                            fireThrowCallback(invoker, invocation, result.getException());
                        } else {
                            fireReturnCallback(invoker, invocation, result.getValue());
                        }
                    }
                    public void caught(Throwable exception) {
                        fireThrowCallback(invoker, invocation, exception);
                    }
                });
            }
        }
        
        private void fireReturnCallback(final Invoker<?> invoker, final Invocation invocation, final Object result) {
            final Method onReturnMethod = (Method)StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_METHOD_KEY));
            final Object onReturnInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_RETURN_INSTANCE_KEY));
    
            //not set onreturn callback
            if (onReturnMethod == null  &&  onReturnInst == null ){
                return ;
            }
            
            if (onReturnMethod == null  ||  onReturnInst == null ){
                throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() +" has a onreturn callback config , but no such "+(onReturnMethod == null ? "method" : "instance")+" found. url:"+invoker.getUrl());
            }
            if (onReturnMethod != null && ! onReturnMethod.isAccessible()) {
                onReturnMethod.setAccessible(true);
            }
            
            Object[] args = invocation.getArguments();
            Object[] params ;
            Class<?>[] rParaTypes = onReturnMethod.getParameterTypes() ;
            if (rParaTypes.length >1 ) {
                if (rParaTypes.length == 2 && rParaTypes[1].isAssignableFrom(Object[].class)){
                    params = new Object[2];
                    params[0] = result;
                    params[1] = args ;
                }else {
                    params = new Object[args.length + 1];
                    params[0] = result;
                    System.arraycopy(args, 0, params, 1, args.length);
                }
            } else {
                params = new Object[] { result };
            }
            // 经过前面的 if else 后,必定有 params[0] = result;
            // result 是 RpcResult 的 value,即 provider 接口的返回值。
            // 所以,CbService 的 onreturn 函数的第一个参数必须是接口的返回值。
            try {
                onReturnMethod.invoke(onReturnInst, params);
            } catch (InvocationTargetException e) {
                fireThrowCallback(invoker, invocation, e.getTargetException());
            } catch (Throwable e) {
                fireThrowCallback(invoker, invocation, e);
            }
        }
    }


    2. 当 consumer 收到服务端的响应时,触发回调函数,调用栈如下:

  • 相关阅读:
    modesim仿真
    EP3C系列FPGA的JTAG检测不了,JTAG下载失败,AS可以下载,下载完成后不执行程序
    本机修改虚拟机linux中的代码文件
    linux中的diff命令
    php中的elseif和else if
    php将数据写入另外一个文件
    IE6下的png不透明问题
    cookie的封装
    php从接口获取数据转成可以用的数组或其他(含转换编码)
    如何让后加载的元素被一开始就有的css样式渲染成功(强制提升css优先级)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/9384956.html
Copyright © 2011-2022 走看看