zoukankan      html  css  js  c++  java
  • Netty概述

    1. 原生NIO存在的问题

      NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等。需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。

      开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。

    2. netty 介绍

    1. 介绍

    官网: https://netty.io/

    1.  Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.

    翻译之后就是:Netty是一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器

    2.Netty 是由 JBOSS 提供的一个 Java 开源框架。Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序

    3.Netty 可以帮助你快速、简单的开发出一个网络应用,相当于简化和流程化了 NIO 的开发过程

    4.Netty 是目前最流行的 NIO 框架,Netty 在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,知名的 Elasticsearch 、Dubbo 框架内部都采用了 Netty。

    5. 架构图如下:

     2. 优点

    1. Netty 对 JDK 自带的 NIO 的 API 进行了封装,解决了上述NIO编程难的问题。

    2. 设计优雅:适用于各种传输类型的统一 API 阻塞和非阻塞 Socket;基于灵活且可扩展的事件模型,可以清晰地分离关注点;高度可定制的线程模型 - 单线程,一个或多个线程池.

    3. 使用方便:详细记录的 Javadoc,用户指南和示例;没有其他依赖项,JDK 5(Netty 3.x)或 6(Netty 4.x)就足够了。(netty版本分为 netty3.x 和 netty4.x、netty5.x(因为Netty5出现重大bug,已经被官网废弃了,目前推荐使用的是Netty4.x的稳定版本))

    4. 高性能、吞吐量更高:延迟更低;减少资源消耗;最小化不必要的内存复制。

    5. 安全:完整的 SSL/TLS 和 StartTLS 支持。

    6. 社区活跃、不断更新:社区活跃,版本迭代周期短,发现的 Bug 可以被及时修复,同时,更多的新功能会被加入

    3. Netty高性能架构设计 

    1. 线程模型基本介绍

    目前存在的线程模型有:传统阻塞 I/O 服务模型 和 Reactor 模式

    Reactor 模式根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现

    (1)单 Reactor 单线程;

    (2)单 Reactor 多线程;

    (3)主从 Reactor 多线程

    Netty 线程模式(Netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor)

    2. 传统的IO服务模型

      传统的IO模型就是我们的一请求一线程模型。这样会存在以下问题:

    当并发数很大,就会创建大量的线程,占用很大系统资源

    连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在read 操作,造成线程资源浪费

    3. Reactor 模式

      I/O 复用结合线程池,就是 Reactor 模式基本设计思想。

      通过一个或多个输入同时传递给服务处理器的模式(基于事件驱动),服务器端程序处理传入的多个请求,并将它们同步分派到相应的处理线程, 因此Reactor模式也叫 Dispatcher模式。Reactor 模式使用IO复用监听事件, 收到事件后,分发给某个线程(进程), 这点就是网络服务器高并发处理关键。

    核心组成:

    Reactor:Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应。

    Handlers:处理程序执行 I/O 事件要完成的实际事件,Reactor 通过调度适当的处理程序来响应 I/O 事件,处理程序执行非阻塞操作。

    4.Reactor 模式分类

    1. 单 Reactor 单线程

      服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接,读、写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑。简单点说就是一个Reactor处理多个请求,且Handler是以同步阻塞的方式处理请求。

    对应结构图如下:

    例如:

    Server:

    package nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    
    public class GroupChatServer {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        /**
         * ServerSocketChannel
         */
        private ServerSocketChannel serverSocketChannel;
    
        /**
         * 端口
         */
        private static final int PORT = 6667;
    
        public GroupChatServer() {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                // 绑定端口
                serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
                // 设置非阻塞模式
                serverSocketChannel.configureBlocking(false);
                // 注册连接请求事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 监听
         */
        public void listen() {
            System.out.println("监听线程: " + Thread.currentThread().getName());
    
            try {
                while (true) {
                    int count = selector.select();
                    // 有事件处理
                    if (count > 0) {
                        // 处理监听到的事件
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        SelectionKey key = null;
                        while (iterator.hasNext()) {
                            key = iterator.next();
                            // 连接请求事件
                            if (key.isAcceptable()) {
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                // 设置非阻塞
                                socketChannel.configureBlocking(false);
                                // 注册读取事件
                                socketChannel.register(selector, SelectionKey.OP_READ);
                                // 提示
                                System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
                            }
    
                            // 数据读取事件
                            if (key.isReadable()) {
                                new GroupServerHandler(selector).readData(key);
                            }
    
                            // 删除当前的key,防止重复处理
                            iterator.remove();
                        }
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            //创建服务器对象
            GroupChatServer groupChatServer = new GroupChatServer();
            groupChatServer.listen();
        }
    }

    Client

    package nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Scanner;
    
    public class GroupChatClient {
    
        /**
         * 服务器地址
         */
        private final String HOST = "127.0.0.1";
        /**
         * 服务器端口
         */
        private final int PORT = 6667;
    
        private Selector selector;
        private SocketChannel socketChannel;
        private String username;
    
        public GroupChatClient() {
            try {
                selector = Selector.open();
                // 连接服务器
                socketChannel = SocketChannel.open(new InetSocketAddress(HOST, PORT));
                // 设置非阻塞
                socketChannel.configureBlocking(false);
                // 将channel 注册到selector, 注册读取事件
                socketChannel.register(selector, SelectionKey.OP_READ);
                // 得到username
                username = socketChannel.getLocalAddress().toString().substring(1);
                System.out.println(username + " is ok!");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void sendMsg(String msg) {
            msg = username + " 说:" + msg;
            try {
                socketChannel.write(ByteBuffer.wrap(msg.getBytes()));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void readMsg() {
            try {
                int count = selector.select();
                // 有可用的通道
                if (count > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isReadable()) {
                            SocketChannel channel = (SocketChannel) key.channel();
                            // 开启一个buffer
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            channel.read(buffer);
                            //把读到的缓冲区的数据转成字符串
                            String msg = new String(buffer.array());
                            System.out.println(msg.trim());
                        }
                    }
    
                    // 删除当前的selectionKey, 防止重复操作
                    iterator.remove();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            GroupChatClient chatClient = new GroupChatClient();
            // 启动一个线程, 每3秒,读取从服务器发送数据
            new Thread() {
                public void run() {
                    while (true) {
                        chatClient.readMsg();
                        try {
                            Thread.currentThread().sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            }.start();
    
            //发送数据给服务器端
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String s = scanner.nextLine();
                chatClient.sendMsg(s);
            }
        }
    }

    Handler

    package nio;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Channel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class GroupServerHandler {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        public GroupServerHandler(Selector selector) {
            this.selector = selector;
        }
    
        /**
         * 读取数据
         *
         * @param key
         */
        public void readData(SelectionKey key) {
            SocketChannel channel = null;
            try {
                channel = (SocketChannel) key.channel();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int read = channel.read(byteBuffer);
                if (read > 0) {
                    //把缓存区的数据转成字符串
                    String msg = new String(byteBuffer.array());
                    System.out.println("form 客户端: " + msg);
    
                    //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                    sendInfoToOtherClients(msg, channel);
                }
            } catch (Exception e) {
                e.printStackTrace();
    
                try {
                    System.out.println(channel.getRemoteAddress() + " 离线了..");
                    //取消注册
                    key.cancel();
                    //关闭通道
                    channel.close();
                } catch (IOException e2) {
                    e2.printStackTrace();
                }
            }
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
            System.out.println("服务器转发消息中...");
            System.out.println("当前线程: " + Thread.currentThread().getName());
    
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                // ignore
            }
    
            for (SelectionKey key : selector.keys()) {
                // 通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                // 排除掉自己
                if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                    // 转型
                    SocketChannel dest = (SocketChannel) targetChannel;
                    // 将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    // 将buffer 的数据写入 通道
                    dest.write(buffer);
                    System.out.println("转发给: " + dest.getRemoteAddress());
                }
            }
        }
    }

    方案分析:

    优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成

    缺点:

    (1) 性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈

    (2) 可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障

    使用场景:客户端的数量有限,业务处理非常快速,比如 Redis在业务处理的时间复杂度 O(1) 的情况

    2. 单 Reactor 多线程

      说白了就是一个Reactor,后面Handler处理的时候用多线程处理。handler 只负责响应事件,不做具体的业务处理, 通过read 读取数据后,会分发给后面的worker线程池的某个线程处理业务。

     改造上面代码:

    (1) 增加线程工具类

    package nio;
    
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadUtils {
    
        private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
    
        public static void execute(Runnable run) {
            executorService.execute(run);
        }
    }

    (2) 修改Handler

    package nio;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Channel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class GroupServerHandler {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        public GroupServerHandler(Selector selector) {
            this.selector = selector;
        }
    
        /**
         * 读取数据
         *
         * @param key
         */
        public void readData(SelectionKey key) {
            ThreadUtils.execute(new Runnable() {
                @Override
                public void run() {
                    SocketChannel channel = null;
                    try {
                        channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int read = channel.read(byteBuffer);
                        if (read > 0) {
                            //把缓存区的数据转成字符串
                            String msg = new String(byteBuffer.array());
                            System.out.println("form 客户端: " + msg);
    
                            //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                            sendInfoToOtherClients(msg, channel);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
    
                        try {
                            System.out.println(channel.getRemoteAddress() + " 离线了..");
                            //取消注册
                            key.cancel();
                            //关闭通道
                            channel.close();
                        } catch (IOException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            });
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
            System.out.println("服务器转发消息中...");
            System.out.println("当前线程: " + Thread.currentThread().getName());
    
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                // ignore
            }
    
            for (SelectionKey key : selector.keys()) {
                // 通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                // 排除掉自己
                if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                    // 转型
                    SocketChannel dest = (SocketChannel) targetChannel;
                    // 将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    // 将buffer 的数据写入 通道
                    dest.write(buffer);
                    System.out.println("转发给: " + dest.getRemoteAddress());
                }
            }
        }
    }

    方案分析:

    优点:可以充分的利用多核cpu 的处理能力

    缺点:多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈.

    3. 主从 Reactor 多线程

      简单理解就是Reactor也加入多线程。Reactor主线程 MainReactor 对象通过select 监听连接事件, 收到事件后,通过Acceptor 处理连接事件。当 Acceptor 处理连接事件后,MainReactor 将连接分配给SubReactor。

    代码改造:

    1. MainReactor 和 SubReactor:

    package nio;
    
    import org.apache.commons.collections.CollectionUtils;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.channels.spi.SelectorProvider;
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    
    public class MainReactor {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        /**
         * ServerSocketChannel
         */
        private ServerSocketChannel serverSocketChannel;
    
        /**
         * 端口
         */
        private static final int PORT = 6667;
    
        public MainReactor() {
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                // 绑定端口
                serverSocketChannel.socket().bind(new InetSocketAddress(PORT));
                // 设置非阻塞模式
                serverSocketChannel.configureBlocking(false);
                // 注册连接请求事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 启动
         */
        public void start() {
            System.out.println("监听线程: " + Thread.currentThread().getName());
    
            // 启动SubReactor 用于处理事件
            initSubReactor();
    
            try {
                int index = -1;
                while (true) {
                    int count = selector.select();
                    // 有事件处理
                    if (count > 0) {
                        // 处理监听到的事件
                        Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                        SelectionKey key = null;
                        while (iterator.hasNext()) {
                            key = iterator.next();
                            // 连接请求事件。只监听OP_ACCEPT 事件, 监听到之后转交给SubReactor
                            if (key.isAcceptable()) {
                                SocketChannel socketChannel = serverSocketChannel.accept();
                                if (socketChannel != null) {
                                    // 设置非阻塞
                                    socketChannel.configureBlocking(false);
                                    // 交给子线程
                                    System.out.println(socketChannel.getRemoteAddress() + " 上线 ");
                                    subReactors.get((++index) % 2).registerChannel(socketChannel);
                                }
                            }
    
                            iterator.remove();
                        }
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
    
        private List<SubReactor> subReactors = new ArrayList<>();
    
        /**
         * 初始化subReactor, 实际上两个够用了。 Tomcat 8 默认也是用的两个子线程。
         */
        public void initSubReactor() {
            if (CollectionUtils.isNotEmpty(subReactors)) {
                return;
            }
    
            subReactors.add(new SubReactor());
            subReactors.add(new SubReactor());
            for (SubReactor subReactor : subReactors) {
                new Thread(subReactor).start();
            }
        }
    
        /**
         * SubReactor 负责处理事件
         * 这里需要注意: 参考https://blog.csdn.net/wanger61/article/details/104951149/
         * 1. SubReactor线程在执行selector.select()时由于有已就绪的,所以不会阻塞而继续往下执行。
         * 2. Reactor线程在第一次执行selector.select()时,必须要有已就绪的事件,否则后面即使有了已就绪的事件,还是会阻塞在selector.select()上。
         */
        public static class SubReactor implements Runnable {
    
            private Selector selector;
    
            public SubReactor() {
                // 每个SubReactor 一个selector
                try {
                    this.selector = SelectorProvider.provider().openSelector();
                } catch (Exception exception) {
                    exception.printStackTrace();
                }
            }
    
            public void registerChannel(SocketChannel sc) throws Exception {
                System.out.println("接收到注册sc: " + sc.getRemoteAddress());
                sc.register(selector, SelectionKey.OP_READ);
    //            selector.wakeup();
            }
    
            @Override
            public void run() {
                System.out.println("SubReactor线程: " + Thread.currentThread().getName());
    
                try {
                    while (true) {
                        // 注意这里不能用selector.select(), 用这个方法会一直阻塞。 select(1) 会造成CPU的浪费。
    //                    int count = selector.select();
                        int count = selector.select(1);
                        // 有事件处理
                        if (count > 0) {
                            // 处理监听到的事件
                            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                            SelectionKey key = null;
                            while (iterator.hasNext()) {
                                key = iterator.next();
                                // 连接请求事件
                                if (key.isReadable()) {
                                    new GroupServerHandler(selector).readData(key);
                                } else {
                                    System.out.println("xxxxxxxxxxxxxxxxxxx");
                                }
    
                                iterator.remove();
                            }
                        }
                    }
                } catch (Throwable e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String[] args) {
            //创建服务器对象
            MainReactor mainReactor = new MainReactor();
            mainReactor.start();
        }
    }

    ServerHandler:

    package nio;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.Channel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    public class GroupServerHandler {
    
        /**
         * Selector 选择器
         */
        private Selector selector;
    
        public GroupServerHandler(Selector selector) {
            this.selector = selector;
        }
    
        /**
         * 读取数据
         *
         * @param key
         */
        public void readData(SelectionKey key) {
            ThreadUtils.execute(new Runnable() {
                @Override
                public void run() {
                    SocketChannel channel = null;
                    try {
                        channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        int read = channel.read(byteBuffer);
                        if (read > 0) {
                            //把缓存区的数据转成字符串
                            String msg = new String(byteBuffer.array());
                            System.out.println("form 客户端: " + msg);
    
                            //向其它的客户端转发消息(去掉自己), 专门写一个方法来处理
                            sendInfoToOtherClients(msg, channel);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
    
                        try {
                            System.out.println(channel.getRemoteAddress() + " 离线了..");
                            //取消注册
                            key.cancel();
                            //关闭通道
                            channel.close();
                        } catch (IOException e2) {
                            e2.printStackTrace();
                        }
                    }
                }
            });
        }
    
        private void sendInfoToOtherClients(String msg, SocketChannel channel) throws IOException {
            System.out.println("服务器转发消息中...");
            System.out.println("当前线程: " + Thread.currentThread().getName());
    
            try {
                Thread.sleep(5 * 1000);
            } catch (InterruptedException e) {
                // ignore
            }
    
            for (SelectionKey key : selector.keys()) {
                // 通过 key  取出对应的 SocketChannel
                Channel targetChannel = key.channel();
                // 排除掉自己
                if (targetChannel instanceof SocketChannel && targetChannel != channel) {
                    // 转型
                    SocketChannel dest = (SocketChannel) targetChannel;
                    // 将msg 存储到buffer
                    ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes());
                    // 将buffer 的数据写入 通道
                    dest.write(buffer);
                    System.out.println("转发给: " + dest.getRemoteAddress());
                }
            }
        }
    }

    客户端代码同上面。

      这里需要提别注意,在SUbReactor 子线程的run 方法中调用selector.select() 会一直阻塞。可以用好

     方案优缺点:

    优点:

    (1) 父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。

    (2) 父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。

    缺点:编程复杂度较高

    结合实例:这种模型在许多项目中广泛使用,包括 Nginx 主从 Reactor 多进程模型,Memcached 主从多线程,Netty 主从多线程模型的支持

     4. netty 模型

      Netty 主要基于主从 Reactors 多线程模型做了一定的改进,其中主从 Reactor 多线程模型有多个 Reactor。

    1. 简单理解

    (1) BossGroup 线程维护Selector , 只关注Accecpt

    (2) 当接收到Accept事件,获取到对应的SocketChannel, 封装成 NIOScoketChannel并注册到Worker 线程(事件循环), 并进行维护

    (3) 当Worker线程监听到selector 中通道发生自己感兴趣的事件后,就进行处理(就由handler), 此时handler 已经加入到通道

    2. 模型理解

    (1)  Netty抽象出两组线程池,BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写

    (2) BossGroup和WorkerGroup类型的本质都是NioEventLoopGroup类型。

    (3) NioEventLoopGroup相当于一个事件循环组,它下面维护很多个NioEventLoop(事件循环)。

    (4) NioEventLoop表示一个不断循环的执行处理任务的线程。每个NioEventLoop都包含一个Selector,用于监听绑定在它上面的socket通讯。

    (5) 每个Boss NioEventLoop循环执行的步骤有3步:

    1》轮询accept事件

    2》处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Worker NioEventLoop的selector上

    3》处理任务队列到任务,即runAllTasks

    (6) 每个Worker NioEventLoop循环执行的步骤:

    1》轮询read,write事件

    2》处理I/O事件,即read,write事件,在对应的NioSocketChannel中进行处理

    3》处理任务队列的任务,即runAllTasks

    (7) 每个 Worker NioEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel 通道,即同pipeline可以获取到channel通道。同时管道中维护了很多处理器。

    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    2021NUAA暑假集训 Day3 题解
    2021NUAA暑假集训 Day2 题解
    2021NUAA暑期模拟赛部分题解
    CodeForces 1038D Slime
    UVA 11149 Power of Matrix
    UVA 10655 Contemplation! Algebra
    UVA 10689 Yet another Number Sequence
    HDU 4549 M斐波那契数列
    HDU 4990 Reading comprehension
    CodeForces 450B Jzzhu and Sequences
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14465508.html
Copyright © 2011-2022 走看看