zoukankan      html  css  js  c++  java
  • dubbo源码阅读-远程暴露(七)之Transport

    接口定义

    @SPI("netty") //缺省值netty
    public interface Transporter {
    
        /**
         * Bind a server.
         *
         * @param url erver url
         * @param handler
         * @return server
         * @throws RemotingException
         * @see com.alibaba.dubbo.remoting.Transporters#bind(URL, ChannelHandler...)
         *url参数 含有server 或者transporter
         */
        @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY})
        Server bind(URL url, ChannelHandler handler) throws RemotingException;
    
        /**
         * Connect to a server.
         *
         * @param url     server url
         * @param handler
         * @return client
         * @throws RemotingException
         * @see com.alibaba.dubbo.remoting.Transporters#connect(URL, ChannelHandler...)
         * url参数含有server 或者transporter
         */
        @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY})
        Client connect(URL url, ChannelHandler handler) throws RemotingException;
    
    }

    类图

    说明

    接上一篇《dubbo源码阅读-远程暴露(七)之Exchangers》

    @Override
        public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
            //1. handler 会再次经过2层包装,增加功能
            ChannelHandler h  = new DecodeHandler(new HeaderExchangeHandler(handler));
            //<1>2. Transports 操作会启动netty 监听端口,配置序列化实现,
            Server s = Transporters.bind(url,h);
            /**
             * <6>HeaderExchangeServer: 会对nettyServer 进行包装, 主要增加2个功能:
             *
             *     a. 对channel进行 空闲时间检测,超过则关闭连接,节省资源。
             *
             *     b. 如果server关闭,则发送消息给client端,不再发送请求到该server。
             */
            return new HeaderExchangeServer( s);
        }

    Transporters

    <1>bind

    com.alibaba.dubbo.remoting.Transporters#bind(com.alibaba.dubbo.common.URL, com.alibaba.dubbo.remoting.ChannelHandler...)

        /**
         * @param url
         * @param handlers 经过DecodeHandler->HeaderExchangeHandler->com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler逐级装饰
         * @return
         * @throws RemotingException
         */
        public static Server bind(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handlers == null || handlers.length == 0) {
                throw new IllegalArgumentException("handlers == null");
            }
            ChannelHandler handler;
            if (handlers.length == 1) {
                //没做特殊处理传入的是一个
                handler = handlers[0];
            } else {
                //如果传入多个handles 通过ChannelHandlerDispatcher包装 内部通过包装使每个Handle都能监听各个事件
                handler = new ChannelHandlerDispatcher(handlers);
            }
            /**
             * <2>getTransporter
             * <3>bind
             */
            return getTransporter().bind(url, handler);
        }

    <2>getTransporter

    com.alibaba.dubbo.remoting.Transporters#getTransporter

     public static Transporter getTransporter() {
            //SPI扩展点 缺省值是 @SPI("netty")
            return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension();
        }

    NettyTransporter

      /**
         * 经过DecodeHandler->HeaderExchangeHandler->com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol#requestHandler逐级装饰
         * @param url erver url
         * @param listener
         * @return
         * @throws RemotingException
         */
        @Override
        public Server bind(URL url, ChannelHandler listener) throws RemotingException {
            //<3>创建一个nettyServer 此处handle是
            return new NettyServer(url, listener);
        }

    <3>NettyServer

    com.alibaba.dubbo.remoting.transport.netty4.NettyServer#NettyServer

     /**
         * <4>ChannelHandlers.wrap
         *  <6>super
         * @param url
         * @param handler
         * @throws RemotingException
         */
        public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
        }

    <4>ChannelHandlers.wrap

      public static ChannelHandler wrap(ChannelHandler handler, URL url) {
            //<5>打包
            return ChannelHandlers.getInstance().wrapInternal(handler, url);
        }

    <5>wrapInternal

     protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
            //将handler经过MultiMessageHandler->HeartbeatHandler->SPI获取Dispatcher默认是ALL 进行装饰
            return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                    .getAdaptiveExtension().dispatch(handler, url)));
        }

    <7>doOpen

    com.alibaba.dubbo.remoting.transport.netty4.NettyServer#doOpen

        @Override
        protected void doOpen() throws Throwable {
            /**
             *  boss线程,主要监听端口和获取worker线程及分配socketChannel给worker
             * NamedThreadFactory自定义线程工厂 并设置线程为守护线程 主线程关闭 子线程自动关闭
             */
            bootstrap = new ServerBootstrap();
            // worker线程负责数据读写
            bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
            //创建niosocket工厂
            workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                    new DefaultThreadFactory("NettyServerWorker", true));
    /**
     * NettyServerHandler->NettyServer->...-ProtocolRequestHandle
     * f
     * 
     */
            final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
            channels = nettyServerHandler.getChannels();
            //参数讲解:https://blog.csdn.net/zhongzunfa/article/details/94590670
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)//tcp协议 禁用Nagle 算法
                    .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)//tcp协议 允许通一个端口可以绑定到多个套接字
                    .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//tcp协议
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) throws Exception {
                            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                            ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
                                    .addLast("decoder", adapter.getDecoder())//解码器
                                    .addLast("encoder", adapter.getEncoder())//编码器
                                    .addLast("handler", nettyServerHandler);//处理器
                        }
                    });
            // 启动服务
            ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
            channelFuture.syncUninterruptibly();
            channel = channelFuture.channel();
    
        }

    <6>AbstractServer

        public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            /**
             * 从url获取绑定ip和端口如:dubbo://127.0.0.1:23888....
             * localAddress例子:/192.168.2.1:23888
             * 成员变量
             */
            localAddress = getUrl().toInetSocketAddress();
            //获取绑定id优先从url参数bind.ip获取 获取不到取url里面封装的
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            //获取绑定端口 优先从url bind.port 获取获取不到取url封装的
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            //anyhost是否有取到绑定ip 参见:https://www.cnblogs.com/LQBlog/p/12469007.html#autoid-6-12-0
            if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = NetUtils.ANYHOST;
            }
            //初始化绑定address 成员变量
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            /**
             *  获取url accepts 控制客户端连接服务端的连接数控制 默认0
             *  文档地址:http://dubbo.apache.org/zh-cn/docs/user/references/xml/dubbo-protocol.html
             *  成员变量
             */
            this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
            /**
             * 获取url参数 idle.timeout 空闲超时时间,单位:毫秒
             * 成员变量
             */
            this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
            try {
                //<7>开启服务器 模板方法模式 由子类实现
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            //获得线程池
            DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
            executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
        }
  • 相关阅读:
    17字符串函数
    16数学函数
    计算文件的相对路径
    PHP生成唯一ID的方法
    PHP高效产生m个n范围内的不重复随机数(m<=n)
    随机红包
    约瑟夫环问题
    求n以内的质数(质数的定义:在大于1的自然数中,除了1和它本身意外,无法被其他自然数整除的数)
    10个值得深思的_PHP_面试问题
    PHP中被忽略的性能优化利器:生成器
  • 原文地址:https://www.cnblogs.com/LQBlog/p/12517551.html
Copyright © 2011-2022 走看看