zoukankan      html  css  js  c++  java
  • netty客户端源码

    随笔记录。

    //创建一个ChannelFactory(客户端代码)

    ChannelFactory factory = new NioClientSocketChannelFactory(

                      Executors.newCachedThreadPool(),

                      Executors.newCachedThreadPool());

    // NioClientSocketChannelFactory构造方法

    public NioClientSocketChannelFactory(

                Executor bossExecutor, Executor workerExecutor,

                int bossCount, int workerCount) {

            ...

            // 线程池

            this.bossExecutor = bossExecutor;

    // 线程池

            this.workerExecutor = workerExecutor;

           // 构建ChannelSink,NioClientSocketPipelineSink实例

           // bossCount默认1,workerCount默认Runtime.getRuntime().availableProcessors() * 2

            sink = new NioClientSocketPipelineSink(

                    bossExecutor, workerExecutor, bossCount, workerCount);

    }

    // NioClientSocketPipelineSink构造方法

    NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor,

                int bossCount, int workerCount) {

            this.bossExecutor = bossExecutor;

          

            bosses = new Boss[bossCount];

            for (int i = 0; i < bosses.length; i ++) {

                bosses[i] = new Boss(i + 1);

            }

           

            workers = new NioWorker[workerCount];

            for (int i = 0; i < workers.length; i ++) {

                workers[i] = new NioWorker(id, i + 1, workerExecutor);

            }

    }

    // 创建Bootstrap并设置factory(客户端代码)

    ClientBootstrap bootstrap = new ClientBootstrap(factory);

    // Bootstrap类set方法

    public void setFactory(ChannelFactory factory) {

            …

            this.factory = factory;

    }

    // 设置ChannelPipelineFactory,实现getPipeline方法,返回一个ChannelPipeline实现类

    // (客户端代码)

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {

                  public ChannelPipeline getPipeline() {

                      ChannelPipeline pipeline = Channels.pipeline();

                      pipeline.addLast("encode",new StringEncoder());

                      pipeline.addLast("decode",new StringDecoder());

                      pipeline.addLast("handler1",new TimeClientHandler());

                      return pipeline;

                  }

              });

    DefaultChannelPipeline类addLast方法

    public synchronized void addLast(String name, ChannelHandler handler) {

        if (name2ctx.isEmpty()) {

            // 初始化name2ctx,head,tail

            init(name, handler);

        } else {

            …

            DefaultChannelHandlerContext oldTail = tail;

            DefaultChannelHandlerContext    newTail =   new DefaultChannelHandlerContext(oldTail, null, name, handler);

            …

            // 最新的DefaultChannelHandlerContext放入tail以及更新到oldTail.next中

            oldTail.next = newTail;

            tail = newTail;

            name2ctx.put(name, newTail);

            …

        }

    }

    // 客户端发起连接请求(客户端代码)

    bootstrap.connect (new InetSocketAddress("127.0.0.1", 8080));

    // connect源代码解读

    ClientBootstrap类connect方法

    public ChannelFuture connect(final SocketAddress remoteAddress,

    final SocketAddress localAddress) {

             …

            ChannelPipeline pipeline;

            try {

               // 返回 DefaultChannelPipeline对象实例

                pipeline = getPipelineFactory().getPipeline();

            } catch (Exception e) {

                throw new ChannelPipelineException("Failed to initialize a pipeline.", e);

            }

            // Set the options.

            // 返回NioClientSocketChannelFactory实例,并创建NioClientSocketChannel实例

            Channel ch = getFactory().newChannel(pipeline);

            ch.getConfig().setOptions(getOptions());

            // Bind.

            if (localAddress != null) {

                ch.bind(localAddress);

            }

            // Connect.

            return ch.connect(remoteAddress);

    }

    NioClientSocketChannelFactory类newChannel方法

    public SocketChannel newChannel(ChannelPipeline pipeline) {

            //this为NioClientSocketChannelFactory实例

           //pipeline为DefaultChannelPipeline实例

           //sink为NioClientSocketPipelineSink实例

           // sink.nextWorker返回一个NioWorker实例

            return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());

    }

    NioClientSocketChannel类构造方法

    NioClientSocketChannel(

                ChannelFactory factory, ChannelPipeline pipeline,

                ChannelSink sink, NioWorker worker) {

            //  新创建一个SocketChannel(newSocket() = > SocketChannel.open())

            super(null, factory, pipeline, sink, newSocket(), worker);

            fireChannelOpen(this);

        }

    继续看父类NioSocketChannel构造方法

    public NioSocketChannel(

                Channel parent, ChannelFactory factory,

                ChannelPipeline pipeline, ChannelSink sink,

                SocketChannel socket, NioWorker worker) {

            super(parent, factory, pipeline, sink);

            this.socket = socket;

            this.worker = worker;

            config = new DefaultNioSocketChannelConfig(socket.socket());

    }

    继续看父类AbstractChannel构造方法

    protected AbstractChannel(

                Channel parent, ChannelFactory factory,

                ChannelPipeline pipeline, ChannelSink sink) {

                       // 传入了一个null值

            this.parent = parent;

                       // NioClientSocketChannelFactory实例

            this.factory = factory;

                       // DefaultChannelPipeline实例

            this.pipeline = pipeline;

            id = allocateId(this);

            pipeline.attach(this, sink);

    }

    DefaultChannelPipeline类attach方法

    public void attach(Channel channel, ChannelSink sink) {

            …

                       // NioClientSocketChannel实例

            this.channel = channel;

                       // NioClientSocketPipelineSink实例

            this.sink = sink;

    }

    // ClientBootstrap类connect方法中ch.connect(remoteAddress)

    //类AbstractChannel

    public ChannelFuture connect(SocketAddress remoteAddress) {

            return Channels.connect(this, remoteAddress);

    }

    //类Channels

    public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {

            if (remoteAddress == null) {

                throw new NullPointerException("remoteAddress");

            }

            ChannelFuture future = future(channel, true);

                       // DefaultChannelPipeline

                       // 新建一个ChannelState实例DownstreamChannelStateEvent

            channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(

                    channel, future, ChannelState.CONNECTED, remoteAddress));

            return future;

    }

    //类NioClientSocketPipelineSink

    public void eventSunk(

                ChannelPipeline pipeline, ChannelEvent e) throws Exception {

            if (e instanceof ChannelStateEvent) {

                ChannelStateEvent event = (ChannelStateEvent) e;

                NioClientSocketChannel channel =

                    (NioClientSocketChannel) event.getChannel();

                ChannelFuture future = event.getFuture();

                ChannelState state = event.getState();

                Object value = event.getValue();

                switch (state) {

                case OPEN:

                    if (Boolean.FALSE.equals(value)) {

                        channel.worker.close(channel, future);

                    }

                    break;

                case BOUND:

                    if (value != null) {

                        bind(channel, future, (SocketAddress) value);

                    } else {

                        channel.worker.close(channel, future);

                    }

                    break;

                case CONNECTED:

                    if (value != null) {

                                                   //第一次客户端发起连接

                        connect(channel, future, (SocketAddress) value);

                    } else {

                        channel.worker.close(channel, future);

                    }

                    break;

                case INTEREST_OPS:

                    channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());

                    break;

                }

            } else if (e instanceof MessageEvent) {

                MessageEvent event = (MessageEvent) e;

                NioSocketChannel channel = (NioSocketChannel) event.getChannel();

                boolean offered = channel.writeBuffer.offer(event);

                assert offered;

                channel.worker.writeFromUserCode(channel);

            }

    }

    private void connect(

                final NioClientSocketChannel channel, final ChannelFuture cf,

                SocketAddress remoteAddress) {

            try {

    // channel.socket在初始化NioClientSocketChannel时创建

    //nio发起连接,因为设置了socket.configureBlocking(false)

    //connect方法立即返回,返回值为false

    //此时服务端已经收到了客户端发送的connect事件并进行处理

                if (channel.socket.connect(remoteAddress)) {

                    channel.worker.register(channel, cf);

                } else {

                    channel.getCloseFuture().addListener(new ChannelFutureListener() {

                        public void operationComplete(ChannelFuture f)

                                throws Exception {

                            if (!cf.isDone()) {

                                cf.setFailure(new ClosedChannelException());

                            }

                        }

                    });

                    cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);

                    channel.connectFuture = cf;

                                         //注册事件,nextBoss()返回一个Runnable实例

                    nextBoss().register(channel);

                }

            } catch (Throwable t) {

                cf.setFailure(t);

                fireExceptionCaught(channel, t);

                channel.worker.close(channel, succeededFuture(channel));

            }

    }

    // Boss内部类 NioClientSocketPipelineSink

    void register(NioClientSocketChannel channel) {

        Runnable registerTask = new RegisterTask(this, channel);

        Selector selector;

        synchronized (startStopLock) {

            if (!started) {

                // Open a selector if this worker didn't start yet.

                try {

                    // 打开一个选择器

                    this.selector = selector =  Selector.open();

                } catch (Throwable t) {

                    throw new ChannelException(

                            "Failed to create a selector.", t);

                }

                // Start the worker thread with the new Selector.

                boolean success = false;

                try {

                    //启动线程,消费任务队列

    //bossExecutor是客户端代码Executors.newCachedThreadPool()所创建

    // nio的selector.select(500)操作

                    DeadLockProofWorker.start(

                            bossExecutor,

                            new ThreadRenamingRunnable(

                                    this, "New I/O client boss #" + id + '-' + subId));

                    success = true;

                } finally {

                    if (!success) {

                        // Release the Selector if the execution fails.

                        try {

                            selector.close();

                        } catch (Throwable t) {

                            logger.warn("Failed to close a selector.", t);

                        }

                        this.selector = selector = null;

                        // The method will return to the caller at this point.

                    }

                }

            } else {

                // Use the existing selector if this worker has been started.

                selector = this.selector;

            }

            assert selector != null && selector.isOpen();

            started = true;

            //写入队列一个注册任务

            boolean offered = registerTaskQueue.offer(registerTask);

            assert offered;

        }

        if (wakenUp.compareAndSet(false, true)) {

            selector.wakeup();

        }

    }

    //类DeadLockProofWorker

    public static void start(final Executor parent, final Runnable runnable) {

      //parent为bossExecutor,即一个线程池

            ......

    //开启一个子线程

            parent.execute(new Runnable() {

                public void run() {

                    PARENT.set(parent);

                    try {

                       // ThreadRenamingRunnable实例

                        runnable.run();

                    } finally {

                        PARENT.remove();

                    }

                }

            });

    }

    //类ThreadRenamingRunnable

    public void run() {

           ......

            // Run the actual runnable and revert the name back when it ends.

            try {

              //runnable为Boss实例

                runnable.run();

            } finally {

                if (renamed) {

                    // Revert the name back if the current thread was renamed.

                    // We do not check the exception here because we know it works.

                    currentThread.setName(oldThreadName);

                }

            }

    }

    // Boss内部类 NioClientSocketPipelineSink中

    public void run() {

        boolean shutdown = false;

        Selector selector = this.selector;

        long lastConnectTimeoutCheckTimeNanos = System.nanoTime();

        for (;;) {

            wakenUp.set(false);

            try {

                // 设置超时阻塞

                int selectedKeyCount = selector.select(500);

                if (wakenUp.get()) {

                    selector.wakeup();

                }

                // 消费队列中的事件

                //nio中register操作

                processRegisterTaskQueue();

                if (selectedKeyCount > 0) {

                    //处理选择器获取到的事件

                    processSelectedKeys(selector.selectedKeys());

                }

                ……

            } catch (Throwable t) {

               ……

            }

        }

    }

    private void processRegisterTaskQueue() {

    for (;;) {

             //获取事件,task为registerTaskQueue.offer(registerTask);RegisterTask实例

            final Runnable task = registerTaskQueue.poll();

            if (task == null) {

                break;

            }

           //执行NioClientSocketPipelineSink中的内部类RegisterTask的Run方法

            task.run();

        }

    }

    //内部类RegisterTask NioClientSocketPipelineSink中

    public void run() {

    try {

             // nio socket注册,只有完成注册以后,才能和服务端进行通信

            channel.socket.register(

                    boss.selector, SelectionKey.OP_CONNECT, channel);

        } catch (ClosedChannelException e) {

            channel.worker.close(channel, succeededFuture(channel));

        }

       ……

    }

    private void processSelectedKeys(Set<SelectionKey> selectedKeys) {

        for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {

            SelectionKey k = i.next();

            i.remove();

            if (!k.isValid()) {

                close(k);

                continue;

            }

            if (k.isConnectable()) {

                //完成客户端连接

                connect(k);

            }

        }

    }

    private void connect(SelectionKey k) {

        NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();

    try {

             //nio完成客户端连接

            if (ch.socket.finishConnect()) {

                k.cancel();

                //NioWorker类注册

                ch.worker.register(ch, ch.connectFuture);

            }

        } catch (Throwable t) {

           .......

        }

    }

    类NioWorker负责读写事件注册处理

    未完待续...

  • 相关阅读:
    关于STM32F405的GPIO中断问题
    No section matches selector
    碳元科技估计要炸
    FreeRTOS 时间片,外部中断,任务优先级的一个疑问
    STM32之HAL库、标准外设库、LL库
    suggest braces around empty body in an 'if' statement
    快速搭建Redis缓存数据库
    Docker Centos安装Openssh
    vmdk虚拟机转换为OVF模板,导入esxi
    Redis安装及主从配置
  • 原文地址:https://www.cnblogs.com/liuxinan/p/6073424.html
Copyright © 2011-2022 走看看