zoukankan      html  css  js  c++  java
  • dubbo中的切换不同transport怎么做到的?处理消息的handler链是咋样的?在哪里唤醒之前阻塞在发送request以后的业务线程?

    server和client都是以下方法得到的,Exchanger这个接口只有这么一个实现,将来可能其他更加复杂获得server和cliet方式,以下这种是目前唯一的

    public class HeaderExchanger implements Exchanger {
    
        public static final String NAME = "header";
    
        public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
        }
    
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
        }
    
    }
    

      




    connect和 bind得到的最终的server和client,Transporters.connect调用方法是以下两个:







    public static Client connect(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            ChannelHandler handler;
            if (handlers == null || handlers.length == 0) {
                handler = new ChannelHandlerAdapter();
            } else if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().connect(url, handler);
        }
    
        public static Transporter getTransporter() {
            return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
        }
    

      

    通过spi的自适应扩展作为生产实例的中间工厂,这个工厂根据url参数得到不同的transport,如果url里面指定netty4,那么就可以得到netty4的client
    切换不同transport怎么做到的?那就是通过自适应扩展加url自由切换。

    回到最上面的,通过bind已经拿到最终的nettyserver,继续包裹了一层HeaderExchangeServer,这里面主要处理心跳、channel、future的封装,以及屏蔽不同类型的server(netty、netty4等等)。



    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
     继续看这里,这个handler很长,对于dubbo来说,最里面这个handler是通过CreateServer方法中的server = Exchangers.bind(url, requestHandler)这个传入进来的,
    这个requestHandler就是dubboprotocl里面内部的,这个handler只有reply方法,作用就是执行doinvoe,也就是真正履行provider义务的地方,也只会在request来的时候才会被调用。

    HeaderExchangeHandler用来处理request、response的,decodehandler用来解码。这handler到这里是不是已经结束了?显然不是,netty4server初始化的时候:





    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }
    
    
    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                .getAdaptiveExtension().dispatch(handler, url)));
    }
    

      




    日志里面经常看到的 “DubboServerHandler”就是这个SERVER_THREAD_POOL_NAME。handler链上又有下面这几个:
    MultiMessageHandler:针对multimessage类型消息,在received做拦截。
    HeartbeatHandler:针对心跳在received做拦截。
    通过dispatch出来的默认的allchannelhandler:对所有io事件做处理,大部分任务都扔到线程池里面做异步处理,防止阻塞netty线程。这个业务线程池的类型、个数也是url指定。

    所以这个handler链路上面从外到内:MultiMessageHandler HeartbeatHandler Allchannelhandler DecodeHandler HeadExchangeHandler dubboprotocol里面自带的带有reply方法的handler。

    对于netty来说,消息在到达这些handler处理以前,已经被netty的编解码handler处理了,所以DecodeHandler可有可无

    HeadExchangeHandler看起来意义不大,其实这个是用来阻断传递到dubbo里面的handler的,它是最后一道防线,用来决定要不要丢给dubbo做reply、doinvoke操作的,最重要的方法:




    public void received(Channel channel, Object message) throws RemotingException {
            channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
            ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
            try {
                if (message instanceof Request) {
                    // handle request.
                    Request request = (Request) message;
                    if (request.isEvent()) {
                        handlerEvent(channel, request);
                    } else {
                        if (request.isTwoWay()) {
                            Response response = handleRequest(exchangeChannel, request);
                            channel.send(response);
                        } else {
                            handler.received(exchangeChannel, request.getData());
                        }
                    }
                } else if (message instanceof Response) {
                    handleResponse(channel, (Response) message);
                } else if (message instanceof String) {
                    if (isClientSide(channel)) {
                        Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
                        logger.error(e.getMessage(), e);
                    } else {
                        String echo = handler.telnet(channel, (String) message);
                        if (echo != null && echo.length() > 0) {
                            channel.send(echo);
                        }
                    }
                } else {
                    handler.received(exchangeChannel, message);
                }
            } finally {
                HeaderExchangeChannel.removeChannelIfDisconnected(channel);
            }
        }
    

      

    如果two-way、对方给我request,需要返回response,那么handleRequest会调用dubboprotocol的reply处理这个request
    如果对方给我的response,那么调用handleResponse(channel, (Response) message);
    static void handleResponse(Channel channel, Response response) throws RemotingException {
            if (response != null && !response.isHeartbeat()) {
                DefaultFuture.received(channel, response);
            }
        }
    

      

    public static void received(Channel channel, Response response) {
            try {
                DefaultFuture future = FUTURES.remove(response.getId());
                if (future != null) {
                    future.doReceived(response);
                } else {
                    logger.warn("The timeout response finally returned at "
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                            + ", response " + response
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                            + " -> " + channel.getRemoteAddress()));
                }
            } finally {
                CHANNELS.remove(response.getId());
            }
        }
    

     

     收到response以后,在这里找到对应的future,通过future唤醒之前阻塞在发送request以后的业务线程:

    private void doReceived(Response res) {
            lock.lock();
            try {
                response = res;
                if (done != null) {
                    done.signal();
                }
            } finally {
                lock.unlock();
            }
            if (callback != null) {
                invokeCallback(callback);
            }
        }
    

      











  • 相关阅读:
    Ganglia Install
    [ZZ]perl一揽子特殊变量
    点滴积累【C#】---C#实现上传word将路径保存到数据库,文件保存到服务器。并且按照名称读取服务器的word
    点滴积累【C#】---C#实现上传照片到物理路径,并且将地址保存到数据库,
    点滴积累【C#】---C#实现上传word以流形式保存到数据库和读取数据库中的word文件。
    点滴积累【C#】---将Excel数据导入到数据库
    点滴积累【C#】---TreeView读取数据库
    DIV内英文或者数字不换行的问题 解决办法
    Rabbitmq中rabbitmqctl的常用命令
    Uploadify参数详解
  • 原文地址:https://www.cnblogs.com/notlate/p/10205043.html
Copyright © 2011-2022 走看看