zoukankan      html  css  js  c++  java
  • consumer filter

    ProtocolFilterWrapper中buildInvokerChain方法把Filter链在一起,调用执行的时候,逐个执行filter,最后执行filter中的invoker。

    //ProtocolFilterWrapper
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
        if (filters.size() > 0) {
            for (int i = filters.size() - 1; i >= 0; i --) {
                final Filter filter = filters.get(i);
                final Invoker<T> next = last;
                last = new Invoker<T>() {
                    public Class<T> getInterface() {
                        return invoker.getInterface();
                    }
                    public URL getUrl() {
                        return invoker.getUrl();
                    }
                    public boolean isAvailable() {
                        return invoker.isAvailable();
                    }
                    public Result invoke(Invocation invocation) throws RpcException {
                        return filter.invoke(next, invocation);
                    }
                    public void destroy() {
                        invoker.destroy();
                    }
                    @Override
                    public String toString() {
                        return invoker.toString();
                    }
                };
            }
        }
        return last;
    }

    上面代码中,last的视图:

    我们这样看:匿名内部类 new Invoker<T> { ... }中,

    封装了final Filter filter,final Invoker<T> next,还有final Invoker<T> invoker,然后把filter链在一起。

    consumer端一共有3个filter:ConsumerContextFilter, MonitorFilter, FutureFilter

    1. ConsumerContextFilter 对RpcContext做设置和清理

    @Activate(group = Constants.CONSUMER, order = -10000)
    public class ConsumerContextFilter implements Filter {
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            RpcContext.getContext()
                    .setInvoker(invoker)
                    .setInvocation(invocation)
                    .setLocalAddress(NetUtils.getLocalHost(), 0)
                    .setRemoteAddress(invoker.getUrl().getHost(), 
                                      invoker.getUrl().getPort());
            if (invocation instanceof RpcInvocation) {
                ((RpcInvocation)invocation).setInvoker(invoker);
            }
            try {
                return invoker.invoke(invocation);
            } finally {
                RpcContext.getContext().clearAttachments();
            }
        }
    
    }

    2. MonitorFilter 统计接口方法调用信息

    3. FutureFilter  处理异步调用

    @Activate(group = Constants.CONSUMER)
    public class FutureFilter implements Filter {}

    在DubboInvoker.doInvoke方法中,
    如果是异步,则在RpcContext设置Future,并返回空的RpcResult:

    ResponseFuture future = currentClient.request(inv, timeout) ;
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
    return new RpcResult();

    如果是同步,则调用Future.get()方法而阻塞:

    RpcContext.getContext().setFuture(null);
    ResponseFuture future = currentClient.request(inv, timeout);
    Result result = (Result)future.get();
    return  result;

    FutureAdapter实现了Future接口,内部封装了一个ResponseFuture对象。
    ResponseFuture是一个接口,DefaultFuture是主要实现类。

    FutureFilter.asyncCallback()为DefaultFuture设置回调函数:

    //FutureFilter.asyncCallback
    ResponseFuture future = ((FutureAdapter<?>)f).getFuture();
    future.setCallback(new ResponseCallback() {
        public void done(Object rpcResult) {
            ...
        }
        public void caught(Throwable exception) {
            ...
        }
    });
    //DefaultFuture
    public void setCallback(ResponseCallback callback) {
        if (isDone()) {
            invokeCallback(callback);
        } else {
            boolean isdone = false;
            lock.lock();
            try{
                if (!isDone()) {
                    this.callback = callback;
                } else {
                    isdone = true;
                }
            }finally {
                lock.unlock();
            }
            if (isdone){
                invokeCallback(callback);
            }
        }
    }
    private void invokeCallback(ResponseCallback c){
        ResponseCallback callbackCopy = c;
        if (callbackCopy == null){
            throw new NullPointerException("callback cannot be null.");
        }
        c = null;
        Response res = response;
        if (res == null) {
            throw new IllegalStateException("response cannot be null. url:"+channel.getUrl());
        }
        
        if (res.getStatus() == Response.OK) {
            try {
                callbackCopy.done(res.getResult());
            } catch (Exception e) {
                logger.error("callback invoke error .reasult:" + res.getResult() + ",url:" + channel.getUrl(), e);
            }
        } else if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) {
            try {
                TimeoutException te = new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage());
                callbackCopy.caught(te);
            } catch (Exception e) {
                logger.error("callback invoke error ,url:" + channel.getUrl(), e);
            }
        } else {
            try {
                RuntimeException re = new RuntimeException(res.getErrorMessage());
                callbackCopy.caught(re);
            } catch (Exception e) {
                logger.error("callback invoke error ,url:" + channel.getUrl(), e);
            }
        }
    }
  • 相关阅读:
    Centos下安装部署redis
    mysql 事务操作
    python 基础杂货铺
    6、Django 第三方工具
    5、Django
    4、django 模板
    RPC框架--missian框架
    jvm详情——7、jvm调优基本配置、方案
    jvm详情——6、堆大小设置简单说明
    jvm详情——5、选择合适的垃圾收集算法
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8352157.html
Copyright © 2011-2022 走看看