zoukankan      html  css  js  c++  java
  • Netty概述

    1. 原生NIO存在的问题

      NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

      开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

    2. netty 介绍

    1. 介绍

    官网: https://netty.io/

    1.  Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

    翻译之后就是:Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器

    2.Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序

    3.Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程

    4.Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

    5. 架构图如下:

     2. 优点

    1. Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述NIO编程难的问题。

    2. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

    3. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。(netty版本分为 netty3.x 和 netty4.x、netty5.x(因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本))

    4. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

    5. 安全:完整的 SSL/TLS 和 StartTLS 支持。

    6. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入

    3. Netty高性能架构设计 

    1. 线程模型基本介绍

    目前存在的线程模型有:传统阻塞 I/O 服务模型 和 Reactor 模式

    Reactor 模式根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现

    (1)单 Reactor 单线程;

    (2)单 Reactor 多线程;

    (3)主从 Reactor 多线程

    Netty 线程模式(Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor)

    2. 传统的IO服务模型

      传统的IO模型就是我们的一请求一线程模型。这样会存在以下问题:

    当并发数很大,就会创建大量的线程,占用很大系统资源

    连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read 操作,造成线程资源浪费

    3. Reactor 模式

      I/O 复用结合线程池,就是 Reactor 模式基本设计思想。

      通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动),服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此Reactor模式也叫 Dispatcher模式。Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键。

    核心组成:

    Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。

    Handlers:处理程序执行 I/O 事件要完成的实际事件,Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。

    4.Reactor 模式分类

    1. 单 Reactor 单线程

      服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑。简单点说就是一个Reactor处理多个请求,且Handler是以同步阻塞的方式处理请求。

    对应结构图如下:

    例如:

    Server:

    package nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    
    public class GroupChatServer {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        /**
         * ServerSocketChannel
         */
        private ServerSocketChannel serverSocketChannel;
    
        /**
         * 端口
         */
        private static final int PORT = 6667;
    
        public GroupChatServer() {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                // 绑定端口
                serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
                // 设置非阻塞模式
                serverSocketChannel.configureBlocking(false);
                // 注册连接请求事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 监听
         */
        public void listen() {
            System.out.println("监听线程: " + Thread.currentThread().getName());
    
            try {
                while (true) {
                    int count = selector.select();
                    // 有事件处理
                    if (count > 0) {
                        // 处理监听到的事件
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        SelectionKey key = null;
                        while (iterator.hasNext()) {
                            key = iterator.next();
                            // 连接请求事件
                            if (key.isAcceptable()) {
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                // 设置非阻塞
                                socketChannel.configureBlocking(false);
                                // 注册读取事件
                                socketChannel.register(selector, SelectionKey.OP_READ);
                                // 提示
                                System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
                            }
    
                            // 数据读取事件
                            if (key.isReadable()) {
                                new GroupServerHandler(selector).readData(key);
                            }
    
                            // 删除当前的key,防止重复处理
                            iterator.remove();
                        }
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            //创建服务器对象
            GroupChatServer groupChatServer = new GroupChatServer();
            groupChatServer.listen();
        }
    }

    Client

    package nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    
    public class GroupChatClient {
    
        /**
         * 服务器地址
         */
        private final String HOST = "127.0.0.1";
        /**
         * 服务器端口
         */
        private final int PORT = 6667;
    
        private Selector selector;
        private SocketChannel socketChannel;
        private String username;
    
        public GroupChatClient() {
            try {
                selector = Selector.open();
                // 连接服务器
                socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
                // 设置非阻塞
                socketChannel.configureBlocking(false);
                // 将channel 注册到selector, 注册读取事件
                socketChannel.register(selector, SelectionKey.OP_READ);
                // 得到username
                username = socketChannel.getLocalAddress().toString().substring(1);
                System.out.println(username + " is ok!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void sendMsg(String msg) {
            msg = username + " 说:" + msg;
            try {
                socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void readMsg() {
            try {
                int count = selector.select();
                // 有可用的通道
                if (count > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isReadable()) {
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 开启一个buffer
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            channel.read(buffer);
                            //把读到的缓冲区的数据转成字符串
                            String msg = new String(buffer.array());
                            System.out.println(msg.trim());
                        }
                    }
    
                    // 删除当前的selectionKey, 防止重复操作
                    iterator.remove();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            GroupChatClient chatClient = new GroupChatClient();
            // 启动一个线程, 每3秒,读取从服务器发送数据
            new Thread() {
                public void run() {
                    while (true) {
                        chatClient.readMsg();
                        try {
                            Thread.currentThread().sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
    
            //发送数据给服务器端
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String s = scanner.nextLine();
                chatClient.sendMsg(s);
            }
        }
    }

    Handler

    package nio;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Channel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class GroupServerHandler {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        public GroupServerHandler(Selector selector) {
            this.selector = selector;
        }
    
        /**
         * 读取数据
         *
         * @param key
         */
        public void readData(SelectionKey key) {
            SocketChannel channel = null;
            try {
                channel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int read = channel.read(byteBuffer);
                if (read > 0) {
                    //把缓存区的数据转成字符串
                    String msg = new String(byteBuffer.array());
                    System.out.println("form 客户端: " + msg);
    
                    //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                    sendInfoToOtherClients(msg, channel);
                }
            } catch (Exception e) {
                e.printStackTrace();
    
                try {
                    System.out.println(channel.getRemoteAddress() + " 离线了..");
                    //取消注册
                    key.cancel();
                    //关闭通道
                    channel.close();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
            }
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
            System.out.println("服务器转发消息中...");
            System.out.println("当前线程: " + Thread.currentThread().getName());
    
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                // ignore
            }
    
            for (SelectionKey key : selector.keys()) {
                // 通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                // 排除掉自己
                if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                    // 转型
                    SocketChannel dest = (SocketChannel) targetChannel;
                    // 将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    // 将buffer 的数据写入 通道
                    dest.write(buffer);
                    System.out.println("转发给: " + dest.getRemoteAddress());
                }
            }
        }
    }

    方案分析:

    优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

    缺点:

    (1) 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈

    (2) 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

    使用场景:客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况

    2. 单 Reactor 多线程

      说白了就是一个Reactor,后面Handler处理的时候用多线程处理。handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务。

     改造上面代码:

    (1) 增加线程工具类

    package nio;
    
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadUtils {
    
        private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        public static void execute(Runnable run) {
            executorService.execute(run);
        }
    }

    (2) 修改Handler

    package nio;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Channel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class GroupServerHandler {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        public GroupServerHandler(Selector selector) {
            this.selector = selector;
        }
    
        /**
         * 读取数据
         *
         * @param key
         */
        public void readData(SelectionKey key) {
            ThreadUtils.execute(new Runnable() {
                @Override
                public void run() {
                    SocketChannel channel = null;
                    try {
                        channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int read = channel.read(byteBuffer);
                        if (read > 0) {
                            //把缓存区的数据转成字符串
                            String msg = new String(byteBuffer.array());
                            System.out.println("form 客户端: " + msg);
    
                            //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                            sendInfoToOtherClients(msg, channel);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
    
                        try {
                            System.out.println(channel.getRemoteAddress() + " 离线了..");
                            //取消注册
                            key.cancel();
                            //关闭通道
                            channel.close();
                        } catch (IOException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            });
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
            System.out.println("服务器转发消息中...");
            System.out.println("当前线程: " + Thread.currentThread().getName());
    
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                // ignore
            }
    
            for (SelectionKey key : selector.keys()) {
                // 通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                // 排除掉自己
                if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                    // 转型
                    SocketChannel dest = (SocketChannel) targetChannel;
                    // 将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    // 将buffer 的数据写入 通道
                    dest.write(buffer);
                    System.out.println("转发给: " + dest.getRemoteAddress());
                }
            }
        }
    }

    方案分析:

    优点:可以充分的利用多核cpu 的处理能力

    缺点:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈.

    3. 主从 Reactor 多线程

      简单理解就是Reactor也加入多线程。Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件。当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor。

    代码改造:

    1. MainReactor 和 SubReactor:

    package nio;
    
    import org.apache.commons.collections.CollectionUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    
    public class MainReactor {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        /**
         * ServerSocketChannel
         */
        private ServerSocketChannel serverSocketChannel;
    
        /**
         * 端口
         */
        private static final int PORT = 6667;
    
        public MainReactor() {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                // 绑定端口
                serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
                // 设置非阻塞模式
                serverSocketChannel.configureBlocking(false);
                // 注册连接请求事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 启动
         */
        public void start() {
            System.out.println("监听线程: " + Thread.currentThread().getName());
    
            // 启动SubReactor 用于处理事件
            initSubReactor();
    
            try {
                int index = -1;
                while (true) {
                    int count = selector.select();
                    // 有事件处理
                    if (count > 0) {
                        // 处理监听到的事件
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        SelectionKey key = null;
                        while (iterator.hasNext()) {
                            key = iterator.next();
                            // 连接请求事件。只监听OP_ACCEPT 事件, 监听到之后转交给SubReactor
                            if (key.isAcceptable()) {
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                if (socketChannel != null) {
                                    // 设置非阻塞
                                    socketChannel.configureBlocking(false);
                                    // 交给子线程
                                    System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
                                    subReactors.get((++index) % 2).registerChannel(socketChannel);
                                }
                            }
    
                            iterator.remove();
                        }
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    
        private List<SubReactor> subReactors = new ArrayList<>();
    
        /**
         * 初始化subReactor, 实际上两个够用了。 Tomcat 8 默认也是用的两个子线程。
         */
        public void initSubReactor() {
            if (CollectionUtils.isNotEmpty(subReactors)) {
                return;
            }
    
            subReactors.add(new SubReactor());
            subReactors.add(new SubReactor());
            for (SubReactor subReactor : subReactors) {
                new Thread(subReactor).start();
            }
        }
    
        /**
         * SubReactor 负责处理事件
         * 这里需要注意: 参考https://blog.csdn.net/wanger61/article/details/104951149/
         * 1. SubReactor线程在执行selector.select()时由于有已就绪的,所以不会阻塞而继续往下执行。
         * 2. Reactor线程在第一次执行selector.select()时,必须要有已就绪的事件,否则后面即使有了已就绪的事件,还是会阻塞在selector.select()上。
         */
        public static class SubReactor implements Runnable {
    
            private Selector selector;
    
            public SubReactor() {
                // 每个SubReactor 一个selector
                try {
                    this.selector = SelectorProvider.provider().openSelector();
                } catch (Exception exception) {
                    exception.printStackTrace();
                }
            }
    
            public void registerChannel(SocketChannel sc) throws Exception {
                System.out.println("接收到注册sc: " + sc.getRemoteAddress());
                sc.register(selector, SelectionKey.OP_READ);
    //            selector.wakeup();
            }
    
            @Override
            public void run() {
                System.out.println("SubReactor线程: " + Thread.currentThread().getName());
    
                try {
                    while (true) {
                        // 注意这里不能用selector.select(), 用这个方法会一直阻塞。 select(1) 会造成CPU的浪费。
    //                    int count = selector.select();
                        int count = selector.select(1);
                        // 有事件处理
                        if (count > 0) {
                            // 处理监听到的事件
                            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                            SelectionKey key = null;
                            while (iterator.hasNext()) {
                                key = iterator.next();
                                // 连接请求事件
                                if (key.isReadable()) {
                                    new GroupServerHandler(selector).readData(key);
                                } else {
                                    System.out.println("xxxxxxxxxxxxxxxxxxx");
                                }
    
                                iterator.remove();
                            }
                        }
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            //创建服务器对象
            MainReactor mainReactor = new MainReactor();
            mainReactor.start();
        }
    }

    ServerHandler:

    package nio;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Channel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class GroupServerHandler {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        public GroupServerHandler(Selector selector) {
            this.selector = selector;
        }
    
        /**
         * 读取数据
         *
         * @param key
         */
        public void readData(SelectionKey key) {
            ThreadUtils.execute(new Runnable() {
                @Override
                public void run() {
                    SocketChannel channel = null;
                    try {
                        channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int read = channel.read(byteBuffer);
                        if (read > 0) {
                            //把缓存区的数据转成字符串
                            String msg = new String(byteBuffer.array());
                            System.out.println("form 客户端: " + msg);
    
                            //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                            sendInfoToOtherClients(msg, channel);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
    
                        try {
                            System.out.println(channel.getRemoteAddress() + " 离线了..");
                            //取消注册
                            key.cancel();
                            //关闭通道
                            channel.close();
                        } catch (IOException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            });
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
            System.out.println("服务器转发消息中...");
            System.out.println("当前线程: " + Thread.currentThread().getName());
    
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                // ignore
            }
    
            for (SelectionKey key : selector.keys()) {
                // 通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                // 排除掉自己
                if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                    // 转型
                    SocketChannel dest = (SocketChannel) targetChannel;
                    // 将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    // 将buffer 的数据写入 通道
                    dest.write(buffer);
                    System.out.println("转发给: " + dest.getRemoteAddress());
                }
            }
        }
    }

    客户端代码同上面。

      这里需要提别注意,在SUbReactor 子线程的run 方法中调用selector.select() 会一直阻塞。可以用好

     方案优缺点:

    优点:

    (1) 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。

    (2) 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。

    缺点:编程复杂度较高

    结合实例:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

     4. netty 模型

      Netty 主要基于主从 Reactors 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor。

    1. 简单理解

    (1) BossGroup 线程维护Selector , 只关注Accecpt

    (2) 当接收到Accept事件,获取到对应的SocketChannel, 封装成 NIOScoketChannel并注册到Worker 线程(事件循环), 并进行维护

    (3) 当Worker线程监听到selector 中通道发生自己感兴趣的事件后,就进行处理(就由handler), 此时handler 已经加入到通道

    2. 模型理解

    (1)  Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

    (2) BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。

    (3) NioEventLoopGroup相当于一个事件循环组,它下面维护很多个NioEventLoop(事件循环)。

    (4) NioEventLoop表示一个不断循环的执行处理任务的线程。每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。

    (5) 每个Boss NioEventLoop循环执行的步骤有3步:

    1》轮询accept事件

    2》处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop的selector上

    3》处理任务队列到任务,即runAllTasks

    (6) 每个Worker NioEventLoop循环执行的步骤:

    1》轮询read,write事件

    2》处理I/O事件,即read,write事件,在对应的NioSocketChannel中进行处理

    3》处理任务队列的任务,即runAllTasks

    (7) 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel 通道,即同pipeline可以获取到channel通道。同时管道中维护了很多处理器。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    【算法每日一练】LeetCode02 两数之和
    【算法每日一练】LeetCode01 两数之和
    【算法题】09-单链表翻转
    【算法题】08- 构造数组的MaxTree
    【算法题】07-生成窗口最大值数组
    【算法题】06-用栈来解决汉诺塔问题
    【算法题】05-用一个栈实现另一个栈的排序
    【算法题】04-猫狗队列
    【算法题】03-使用递归和栈逆序一个栈
    【算法题】02-使用两个栈实现队列额的功能
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14465508.html
Copyright © 2011-2022 走看看