zoukankan      html  css  js  c++  java
  • Netty学习之Reactor线程模型

    一、什么是Reactor模型

      Reactor设计模式是event-driven architecture(事件驱动)的一种实现方式。Reactor会解耦并发请求的服务并分发给对应的事件处理器来处理。

      目前,许多流行的开源框架都用到了Reactor模型。如:netty、node.js等,包括java的nio。

    二、基于IO事件驱动的分发处理模型

      1)分而治之

      一个连接里完整的网络处理过程一般分为accept、read、decode、process、encode、send这几步。

      Reactor模式将每个步骤映射为一个Task,服务端线程执行的最小逻辑单元不再是一次完整的网络请求,而是Task,且采用非阻塞方式执行。

      2)事件驱动

      每个Task对应特定网络事件。当Task准备就绪时,Reactor收到对应的网络事件通知,并将Task分发给绑定了对应网络事件的Handler执行。

      3)几个角色

      reactor:负责绑定管理事件和处理接口;

      selector:负责监听响应事件,将事件分发给绑定了该事件的Handler处理;

      Handler:事件处理器,绑定了某类事件,负责执行对应事件的Task对事件进行处理;

      Acceptor:Handler的一种,绑定了connect事件。当客户端发起connect请求时,Reactor会将accept事件分发给Acceptor处理。

    三、Reactor三种线程模型

      Netty是典型的Reactor模型结构,常见的Reactor线程模型有三种,分别是:Reactor单线程模型;Reactor多线程模型;主从Reactor多线程模型。

      1、单线程模型

      Reactor单线程模型,指的是所有的I/O操作都在同一个NIO线程上面完成,NIO线程的职责如下:

    • 作为NIO服务端,接收客户端的TCP连接;
    • 作为NIO客户端,向服务端发起TCP连接;
    • 读取通信对端的请求或者应答消息;
    • 向通信对端发送消息请求或者应答消息;

      Reactor线程是个多面手,负责多路分离套接字,Accept新连接,并分派请求到处理器链中。该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。如图所示:所有的处理操作Reactor、Acceptor、Handler都是一个线程实现。

                    

       服务端线程启动代码如下:

    public class ReactorServer {
        public static void main(String[] args) throws Exception{
            new Thread(new Reactor(8080),"reactor-001").start();
        }
    }

      Reactor线程:

    public class Reactor implements Runnable {
    
        private final Selector selector;
        private final ServerSocketChannel serverSocketChannel;
    
        public Reactor(int port) throws IOException { //Reactor初始化
            selector = Selector.open(); //打开一个Selector
            serverSocketChannel = ServerSocketChannel.open(); //建立一个Server端通道
            serverSocketChannel.socket().bind(new InetSocketAddress(port)); //绑定服务端口
            serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必须是非阻塞的
            //Reactor是入口,最初给一个channel注册上去的事件都是accept
            SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            //attach callback object, Acceptor
            sk.attach(new Acceptor(serverSocketChannel, selector));//绑定接收事件处理器
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    selector.select(); //就绪事件到达之前,阻塞
                    Set selected = selector.selectedKeys(); //拿到本次select获取的就绪事件
                    Iterator it = selected.iterator();
                    while (it.hasNext()) {
                        //这里进行任务分发
                        dispatch((SelectionKey) (it.next()));
                    }
                    selected.clear();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                if (selector != null) {
                    try {
                        selector.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        void dispatch(SelectionKey k) {
    
            Runnable r = (Runnable) (k.attachment()); //这里很关键,拿到每次selectKey里面附带的处理对象,然后调用其run,这个对象在具体的Handler里会进行创建,初始化的附带对象为Acceptor(看上面构造器)
            //调用之前注册的callback对象
            if (r != null) {
                r.run();//只是拿到句柄执行run方法,并没有新起线程
            }
        }
    }

      服务端Acceptor连接建立:

    public class Acceptor implements Runnable {
    
        private final Selector selector;
    
        private final ServerSocketChannel serverSocketChannel;
    
        Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
            this.serverSocketChannel = serverSocketChannel;
            this.selector = selector;
        }
    
        @Override
        public void run() {
            SocketChannel socketChannel;
            try {
                socketChannel = serverSocketChannel.accept();   //三次握手
                if (socketChannel != null) {
                    System.out.println(String.format("收到来自 %s 的连接",
                            socketChannel.getRemoteAddress()));
    
                    new Handler(socketChannel, selector); //这里把客户端通道传给Handler,
                    // Handler负责接下来的事件处理(除了连接事件以外的事件均可)
              // new AsyncHandler(socketChannel, selector);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

      服务端Handler代码:

    public class Handler implements Runnable {
    
        private final SelectionKey selectionKey;
        private final SocketChannel socketChannel;
    
        private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
    
        private final static int READ = 0;
        private final static int SEND = 1;
    
        private int status = READ;
    
        public Handler(SocketChannel socketChannel, Selector selector) throws IOException {
            this.socketChannel = socketChannel; //接收客户端连接
            this.socketChannel.configureBlocking(false); //置为非阻塞模式(selector仅允非阻塞模式)
    
            selectionKey = socketChannel.register(selector, 0); //将该客户端注册到selector,得到一个SelectionKey,以后的select到的就绪动作全都是由该对象进行封装
            selectionKey.attach(this); //附加处理对象,当前是Handler对象,run是对象处理业务的方法
            selectionKey.interestOps(SelectionKey.OP_READ); //走到这里,说明之前Acceptor里的建连已完成,那么接下来就是读取动作,因此这里首先将读事件标记为“感兴趣”事件
            selector.wakeup(); //让阻塞的selector立即返回  ----> selector.select()
        }
    
        @Override
        public void run() {
            try {
                switch (status) {
                    case READ:
                        read();
                        break;
                    case SEND:
                        send();
                        break;
                    default:
                }
            } catch (IOException e) { //这里的异常处理是做了汇总,常出的异常就是server端还有未读/写完的客户端消息,客户端就主动断开连接,这种情况下是不会触发返回-1的,这样下面read和write方法里的cancel和close就都无法触发,这样会导致死循环异常(read/write处理失败,事件又未被cancel,因此会不断的被select到,不断的报异常)
                System.err.println("read或send时发生异常!异常信息:" + e.getMessage());
                selectionKey.cancel();
                try {
                    socketChannel.close();
                } catch (IOException e2) {
                    System.err.println("关闭通道时发生异常!异常信息:" + e2.getMessage());
                    e2.printStackTrace();
                }
            }
        }
    
        private void read() throws IOException {
            if (selectionKey.isValid()) {
                readBuffer.clear();
                int count = socketChannel.read(readBuffer); //read方法结束,意味着本次"读就绪"变为"读完毕",标记着一次就绪事件的结束
                if (count > 0) {
                    System.out.println(String.format("收到来自 %s 的消息: %s",
                            socketChannel.getRemoteAddress(),new String(readBuffer.array())));
                    status = SEND;
                    selectionKey.interestOps(SelectionKey.OP_WRITE); //注册写方法
                } else {
                    //读模式下拿到的值是-1,说明客户端已经断开连接,那么将对应的selectKey从selector里清除,否则下次还会select到,因为断开连接意味着读就绪不会变成读完毕,也不cancel,下次select会不停收到该事件
                    //所以在这种场景下,(服务器程序)你需要关闭socketChannel并且取消key,最好是退出当前函数。注意,这个时候服务端要是继续使用该socketChannel进行读操作的话,就会抛出“远程主机强迫关闭一个现有的连接”的IO异常。
                    selectionKey.cancel();
                    socketChannel.close();
                    System.out.println("read时-------连接关闭");
                }
            }
        }
    
        void send() throws IOException {
            if (selectionKey.isValid()) {
                sendBuffer.clear();
                sendBuffer.put(String.format("我收到来自%s的信息辣:%s,  200ok;",
                        socketChannel.getRemoteAddress(),
                        new String(readBuffer.array())).getBytes());
                sendBuffer.flip();
                int count = socketChannel.write(sendBuffer); //write方法结束,
                // 意味着本次写就绪变为写完毕,标记着一次事件的结束
    
                if (count < 0) {
                    //同上,write场景下,取到-1,也意味着客户端断开连接
                    selectionKey.cancel();
                    socketChannel.close();
                    System.out.println("send时-------连接关闭");
                }
    
                //没断开连接,则再次切换到读
                status = READ;
                selectionKey.interestOps(SelectionKey.OP_READ);
            }
        }
    }

      对于一些小容量应用场景,可以使用单线程模型,但是对于高负载、大并发的应用却不合适,主要原因如下:

    • 一个NIO线程同时处理成百上千的链路,性能上无法支撑。即便NIO线程的CPU负荷达到100%,也无法满足海量消息的编码、解码、读取和发送;
    • 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往进行重发,这更加重了NIO线程的负载,最终导致大量消息积压和处理超时,NIO线程会成为系统的性能瓶颈;
    • 可靠性问题。一旦NIO线程意外跑飞,或者进入死循环,会导致整个系统通讯模块不可用,不能接收和处理外部信息,造成节点故障。

      为了解决这些问题,演进出了Reactor多线程模型,下面我们一起学习下Reactor多线程模型。

      2、多线程模型

      Reactor多线程模型与单线程模型最大区别就是有一组NIO线程处理I/O操作,它的特点如下:

    • 有一个专门的NIO线程--acceptor新城用于监听服务端,接收客户端的TCP连接请求;
    • 网络I/O操作--读、写等由一个NIO线程池负责,线程池可以采用标准的JDK线程池实现,它包含一个任务队列和N个可用的线程,由这些NIO线程负责消息的读取、解码、编码和发送;
    • 1个NIO线程可以同时处理N条链路,但是1个链路只对应1个NIO线程,防止发生并发操作问题。

      如图所示:Reactor、Acceptor的处理操作是一个线程实现。Handler是另一个线程实现。

                     

       因此,如代码所示,其余代码一致,重写Handler为:

    public class AsyncHandler implements Runnable {
    
        private final Selector selector;
    
        private final SelectionKey selectionKey;
        private final SocketChannel socketChannel;
    
        private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
        private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
    
        private final static int READ = 0; //读取就绪
        private final static int SEND = 1; //响应就绪
        private final static int PROCESSING = 2; //处理中
    
        private int status = READ; //所有连接完成后都是从一个读取动作开始的
    
        //开启线程数为5的异步处理线程池
        private static final ExecutorService workers = Executors.newFixedThreadPool(5);
    
        public AsyncHandler(SocketChannel socketChannel, Selector selector) throws IOException {
            this.socketChannel = socketChannel;
            this.socketChannel.configureBlocking(false);
            selectionKey = socketChannel.register(selector, 0);
            selectionKey.attach(this);
            selectionKey.interestOps(SelectionKey.OP_READ);
            this.selector = selector;
            this.selector.wakeup();
        }
    
        @Override
        public void run() { //如果一个任务正在异步处理,那么这个run是直接不触发任何处理的,read和send只负责简单的数据读取和响应,业务处理完全不阻塞这里的处理
            switch (status) {
                case READ:
                    read();
                    break;
                case SEND:
                    send();
                    break;
                default:
            }
        }
    
        private void read() {
            if (selectionKey.isValid()) {
                try {
                    readBuffer.clear();
                    int count = socketChannel.read(readBuffer);
                    if (count > 0) {
                        status = PROCESSING; //置为处理中,处理完成后该状态为响应,表示读入处理完成,接下来可以响应客户端了
                        workers.execute(this::readWorker); //异步处理
                    } else {
                        selectionKey.cancel();
                        socketChannel.close();
                        System.out.println("read时-------连接关闭");
                    }
                } catch (IOException e) {
                    System.err.println("处理read业务时发生异常!异常信息:" + e.getMessage());
                    selectionKey.cancel();
                    try {
                        socketChannel.close();
                    } catch (IOException e1) {
                        System.err.println("处理read业务关闭通道时发生异常!异常信息:" + e.getMessage());
                    }
                }
            }
        }
    
        void send() {
            if (selectionKey.isValid()) {
                status = PROCESSING; //置为执行中
                workers.execute(this::sendWorker); //异步处理
                selectionKey.interestOps(SelectionKey.OP_READ); //重新设置为读
            }
        }
    
        //读入信息后的业务处理
        private void readWorker() {
    //        try {
    //            Thread.sleep(5000L);
    //        } catch (InterruptedException e) {
    //            e.printStackTrace();
    //        }
            System.out.println(String.format("收到来自客户端的消息: %s",
                    new String(readBuffer.array())));
    
            status = SEND;
    
            selectionKey.interestOps(SelectionKey.OP_WRITE); //把当前事件改为写事件
            this.selector.wakeup(); //唤醒阻塞在select的线程,
            // 因为该interestOps写事件是放到子线程的,
            // select在该channel还是对read事件感兴趣时又被调用
            // ,因此如果不主动唤醒,
            // select可能并不会立刻select该读就绪事件(在该例中,可能永远不会被select到)
        }
    
        private void sendWorker() {
            try {
                sendBuffer.clear();
                sendBuffer.put(String.format("我收到来自%s的信息辣:%s,  200ok;",
                        socketChannel.getRemoteAddress(),
                        new String(readBuffer.array())).getBytes());
                sendBuffer.flip();
    
                int count = socketChannel.write(sendBuffer);
    
                if (count < 0) {
                    selectionKey.cancel();
                    socketChannel.close();
                    System.out.println("send时-------连接关闭");
                } else {
                    //再次切换到读
                    status = READ;
                }
            } catch (IOException e) {
                System.err.println("异步处理send业务时发生异常!异常信息:" + e.getMessage());
                selectionKey.cancel();
                try {
                    socketChannel.close();
                } catch (IOException e1) {
                    System.err.println("异步处理send业务关闭通道时发生异常!异常信息:" + e.getMessage());
                }
            }
        }
    }

      在绝大多数场景下,Reactor多线程模型都可以满足性能需求;但是,在极特殊应用场景中,一个NIO线程负责监听和处理所有的客户端连接可能会存在性能问题。例如百万客户端并发连接,或者服务端需要对客户端的握手信息进行安全认证,认证本身非常损耗性能。这类场景下,单独一个Acceptor线程可能会存在性能不足问题,为了解决性能问题,产生了第三种Reactor线程模型--主从Reactor多线程模型。

      3、主从多线程模型

      特点是:服务端用于接收客户端连接的不再是1个单独的NIO线程,而是一个独立的NIO线程池。Acceptor接收到客户端TCP连接请求处理完成后(可能包含接入认证等),将新创建的SocketChannel注册到I/O线程池(sub reactor线程池)的某个I/O线程上,由它负责SocketChannel的读写和编解码工作。

      Acceptor线程池只用于客户端的登录、握手和安全认证,一旦链路建立成功,就将链路注册到后端subReactor线程池的I/O线程上,有I/O线程负责后续的I/O操作。

      第三种模型比起第二种模型,是将Reactor分成两部分,mainReactor负责监听server socket,accept新连接,并将建立的socket分派给subReactor。subReactor负责多路分离已连接的socket,读写网络数据,对业务处理功能,其扔给worker线程池完成。通常,subReactor个数上可与CPU个数等同。

      如下图所示,

                 

       新增SubReactor部分:

    public class SubReactor implements Runnable {
        private final Selector selector;
        private boolean register = false; //注册开关表示,为什么要加这么个东西,可以参考Acceptor设置这个值那里的描述
        private int num; //序号,也就是Acceptor初始化SubReactor时的下标
    
        SubReactor(Selector selector, int num) {
            this.selector = selector;
            this.num = num;
        }
    
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                System.out.println(String.format("%d号SubReactor等待注册中...", num));
                while (!Thread.interrupted() && !register) {
                    try {
                        if (selector.select() == 0) {
                            continue;
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator it = selectedKeys.iterator();
                    while (it.hasNext()) {
                        dispatch((SelectionKey)it.next());
                        it.remove();
                    }
                }
            }
        }
    
        private void dispatch(SelectionKey key) {
            Runnable r = (Runnable) (key.attachment());
            if (r != null) {
                r.run();
            }
        }
    
        void registering(boolean register) {
            this.register = register;
        }
    
    }

      重写Acceptor部分:

    public class Acceptor implements Runnable {
    
        private final ServerSocketChannel serverSocketChannel;
    
        private final int coreNum = Runtime.getRuntime().availableProcessors(); // 获取CPU核心数
    
        private final Selector[] selectors = new Selector[coreNum]; // 创建selector给SubReactor使用,个数为CPU核心数(如果不需要那么多可以自定义,毕竟这里会吞掉一个线程)
    
        private int next = 0; // 轮询使用subReactor的下标索引
    
        private SubReactor[] subReactors = new SubReactor[coreNum]; // subReactor
    
        private Thread[] threads = new Thread[coreNum]; // subReactor的处理线程
    
        Acceptor(ServerSocketChannel serverSocketChannel) throws IOException {
            this.serverSocketChannel = serverSocketChannel;
            // 初始化
            for (int i = 0; i < coreNum; i++) {
                selectors[i] = Selector.open();
                subReactors[i] = new SubReactor(selectors[i], i); //初始化sub reactor
                threads[i] = new Thread(subReactors[i]); //初始化运行sub reactor的线程
                threads[i].start(); //启动(启动后的执行参考SubReactor里的run方法)
            }
        }
    
        @Override
        public void run() {
            SocketChannel socketChannel;
            try {
                socketChannel = serverSocketChannel.accept(); // 阻塞获取连接
                if (socketChannel != null) {
                    //轮询reactors[] 处理接收到的请求
                    System.out.println(String.format("收到来自 %s 的连接",socketChannel.getRemoteAddress()));
                    socketChannel.configureBlocking(false); //
    
                    subReactors[next].registering(true);
                    /*让线下一次subReactors的while循环不去执行
                     selector.select,但是select我们是使用的不超时阻塞的方式,
                     所以下一步需要执行wakeup()
                     * */
    
                    selectors[next].wakeup(); //使一個阻塞住的selector操作立即返回
    
                    SelectionKey selectionKey = socketChannel.register(selectors[next],
                            SelectionKey.OP_READ); // 当前客户端通道SocketChannel
                    // 向selector[next]注册一个读事件,返回key
    
                    selectors[next].wakeup();
                    /*使一個阻塞住的selector操作立即返回,这样才能对刚刚注册的SelectionKey感兴趣
                    */
    
                    subReactors[next].registering(false); // 本次事件注册完成后,需要再次触发select的执行
                    // ,因此这里Restart要在设置回false(具体参考SubReactor里的run方法)
                    selectionKey.attach(new AsyncHandler(socketChannel, selectors[next]));
                    // 绑定Handler
    
                    //轮询负载
                    if (++next == selectors.length) {
                        next = 0; //越界后重新分配
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }   

      四、NioEventLoopGroup 与 Reactor 线程模型的对应

      Netty的线程模型并发固定不变,通过在启动辅助类中创建不同的EventLoopGroup实例并进行适当的参数配置,就可以支持上述三种Reactor线程模型。

    /**
         * Netty单线程模型服务端代码示例
         * @param port
         */
        public void bind(int port) {
            EventLoopGroup reactorGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(reactorGroup, reactorGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("http-codec", new HttpServerCodec());
                            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            //后面代码省略
                        }
                    });
            
                Channel ch = b.bind(port).sync().channel();
                ch.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                reactorGroup.shutdownGracefully();
            }
        }
    /**
         * Netty多线程模型代码
         * @param port
         */
        public void bind2(int port) {
            EventLoopGroup acceptorGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup ioGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(acceptorGroup, ioGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("http-codec", new HttpServerCodec());
                            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            //后面代码省略
                        }
                    });
            
                Channel ch = b.bind(port).sync().channel();
                ch.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                acceptorGroup.shutdownGracefully();
                ioGroup.shutdownGracefully();
            }
        }
    /**
         * Netty主从线程模型代码
         * @param port
         */
        public void bind3(int port) {
            EventLoopGroup acceptorGroup = new NioEventLoopGroup();
            NioEventLoopGroup ioGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(acceptorGroup, ioGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast("http-codec", new HttpServerCodec());
                            ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
                            ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
                            //后面代码省略
                        }
                    });
            
                Channel ch = b.bind(port).sync().channel();
                ch.closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                acceptorGroup.shutdownGracefully();
                ioGroup.shutdownGracefully();
            }
        }

      说完Reacotr模型的三种形式,那么Netty是哪种呢?其实,我还有一种Reactor模型的变种没说,那就是去掉线程池的第三种形式的变种,这也 是Netty NIO的默认模式。在实现上,Netty中的Boss类充当mainReactor,NioWorker类充当subReactor(默认 NioWorker的个数是Runtime.getRuntime().availableProcessors())。在处理新来的请求 时,NioWorker读完已收到的数据到ChannelBuffer中,之后触发ChannelPipeline中的ChannelHandler流。

      Netty是事件驱动的,可以通过ChannelHandler链来控制执行流向。因为ChannelHandler链的执行过程是在 subReactor中同步的,所以如果业务处理handler耗时长,将严重影响可支持的并发数。这种模型适合于像Memcache这样的应用场景,但 对需要操作数据库或者和其他模块阻塞交互的系统就不是很合适。Netty的可扩展性非常好,而像ChannelHandler线程池化的需要,可以通过在 ChannelPipeline中添加Netty内置的ChannelHandler实现类–ExecutionHandler实现,对使用者来说只是 添加一行代码而已。对于ExecutionHandler需要的线程池模型,Netty提供了两种可 选:1) MemoryAwareThreadPoolExecutor 可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻 塞),并可控制单个Channel待处理任务的上限;2) OrderedMemoryAwareThreadPoolExecutor 是  MemoryAwareThreadPoolExecutor 的子类,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处 理模式下可能出现的错误的事件顺序,但它并不保证同一Channel中的事件都在一个线程中执行(通常也没必要)。一般来 说,OrderedMemoryAwareThreadPoolExecutor 是个很不错的选择。

  • 相关阅读:
    Python统计字符串中出现次数最多的人名
    初探CORBA组件化编程
    shell脚本—基础知识,变量
    Java多线程--线程交替
    Qt中采用多线程实现Socket编程
    Python字符串格式化--formate()的应用
    JAVA中浅复制与深复制
    Python这些问题你会吗?
    PHP控制反转(IOC)和依赖注入(DI
    Go 语言指针
  • 原文地址:https://www.cnblogs.com/jing99/p/12498783.html
Copyright © 2011-2022 走看看