zoukankan      html  css  js  c++  java
  • IO学习笔记7

    2.4 多路复用javaAPI

    在上面我们简单java代码实现了多路复用,是一个单线程版的。讲上面的epoll代码复制到linux服务器中,使用strace追踪系统调用。

    javaAPI会根据系统类型自动选择最优的多路复用器,因此在linux下默认就是使用epoll模型,也可以通过启动参数指定多路复用器模型。

    # 因为默认是epoll,所以系统调用起名为epoll开头
    strace -ff -o epoll java SingletonMultiplexingIO
    # 再指定poll 起名为poll开头的
    strace -ff -o poll java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider SingletonMultiplexingIO
    

    执行完上面两条命令后就可以得到如下文件:

    total 2160
    -rw-rw-r-- 1 xz xz   13911 Jun 22 17:13 epoll.32474
    -rw-rw-r-- 1 xz xz  526239 Jun 22 17:13 epoll.32475
    -rw-rw-r-- 1 xz xz   62970 Jun 22 17:13 epoll.32476
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 epoll.32477
    -rw-rw-r-- 1 xz xz    1157 Jun 22 17:13 epoll.32478
    -rw-rw-r-- 1 xz xz    2183 Jun 22 17:13 epoll.32479
    -rw-rw-r-- 1 xz xz   28151 Jun 22 17:13 epoll.32480
    -rw-rw-r-- 1 xz xz   20128 Jun 22 17:13 epoll.32481
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 epoll.32482
    -rw-rw-r-- 1 xz xz 1214432 Jun 22 17:13 epoll.32483
    -rw-rw-r-- 1 xz xz    1935 Jun 22 17:13 epoll.32589
    -rw-rw-r-- 1 xz xz   13661 Jun 22 17:13 poll.32595
    -rw-rw-r-- 1 xz xz    9633 Jun 22 17:13 poll.32596
    -rw-rw-r-- 1 xz xz   13954 Jun 22 17:13 poll.32602
    -rw-rw-r-- 1 xz xz  206806 Jun 22 17:13 poll.32603
    -rw-rw-r-- 1 xz xz    2121 Jun 22 17:13 poll.32604
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 poll.32605
    -rw-rw-r-- 1 xz xz    1157 Jun 22 17:13 poll.32606
    -rw-rw-r-- 1 xz xz    2342 Jun 22 17:13 poll.32607
    -rw-rw-r-- 1 xz xz    6577 Jun 22 17:13 poll.32608
    -rw-rw-r-- 1 xz xz    3897 Jun 22 17:13 poll.32609
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 poll.32610
    -rw-rw-r-- 1 xz xz   10674 Jun 22 17:13 poll.32611
    -rw-rw-r-- 1 xz xz    1935 Jun 22 17:13 poll.32612
    -rw-rw-r-- 1 xz xz    3317 Jun 22 17:07 SingletonMultiplexingIO.class
    -rw-rw-r-- 1 xz xz    3520 Jun 22 17:06 SingletonMultiplexingIO.java
    

    查看epoll.32475:

    ...
    # 首先创建一个socket套接字,得到文件描述符4
    socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
    ...
    # 绑定端口9000到文件描述符4上,并监听文件描述符4
    bind(4, {sa_family=AF_INET, sin_port=htons(9000), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
    listen(4, 20) 
    ...
    # 设置非阻塞
    fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
    ...
    # 打印创建socket成功
    write(1, "create socket 9000 successful", 29) = 29
    # 建立一个管道,占用文件描述符5,6,这两个文件描述符先不管
    ...
    pipe([5, 6])                            = 0
    ...
    # 创建epoll,得到文件描述符7 这个7就是epoll在内核空间开辟的红黑树对应的文件描述符
    epoll_create(256)                       = 7
    ...
    # 调用epoll_ctl,将5添加到epoll中,这个不管
    epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4613014228473741317}}) = 0
    write(1, "create multiplexing successful", 30) = 30
    ...
    write(1, "register socket to epoll success"..., 35) = 35
    ...
    # 将4通过 epoll_ctl 添加(EPOLL_CTL_ADD)到epoll中。这里注意,我们代码中,
    # 打印: register socket to epoll success 这句话位置是在socket创建完成后,selector.select()代码执行前
    # 但是在实际的调用中,注册4到epoll中在那句话后面打印的,因此实际执行中,在第一次执行 selector.select()时才注册 serverSocket到epoll中,其实是懒加载
    epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=5340325089890009092}}) = 0
    # selector.select(50) 对应的就是下面的epoll_wait
    epoll_wait(7, {}, 8192, 50)             = 0
    epoll_wait(7, {}, 8192, 50)             = 0
    ...
    

    然后再看poll.32596:

    ...
    # 首先创建一个socket套接字,得到文件描述符4
    socket(PF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
    ...
    # 绑定端口9000到文件描述符4上,并监听文件描述符4
    bind(4, {sa_family=AF_INET, sin_port=htons(9000), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
    listen(4, 20)
    ...
    # 设置非阻塞
    fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
    ...
    write(1, "create socket 9000 successful", 29) = 29
    pipe([5, 6])                            = 0
    write(1, "create multiplexing successful", 30) = 30
    write(1, "register socket to epoll success"..., 35) = 35
    poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, 50) = 0 (Timeout)
    

    通过上面两个文件,可以很详细的看出两种方式的系统调用的区别。

    然后我们来看如何使用java代码实现多路复用器:

    单线程多路复用器

    也就是上面我们做测试用的多路复用器:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/15
     */
    public class SingletonMultiplexingIO {
    
        private Selector selector;
    
        public void init() {
            try {
                // 创建服务端 socket 系统调用
                // socket(...)  = 4
                ServerSocketChannel ssc = ServerSocketChannel.open();
                // bind(4, 9000)
                // listen(4, 20)
                ssc.bind(new InetSocketAddress(9000), 20);
                // 设置为非阻塞
                // fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
                ssc.configureBlocking(false);
                System.out.println("create socket 9000 successful");
    
                // 创建多路复用器, 如果是epoll 则相当于调用 epoll_create
                selector = Selector.open();
                System.out.println("create multiplexing successful");
                // 向多路复用器注册fd
                // 代码虽然写在这里,但是实际调用是在下面第一次调用 selector.select()
                // 原因是懒加载
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("register socket to epoll successful");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void start() {
            init();
            System.out.println("服务端启动了");
            while (true) {
                try {
                    // selector.select()
                    // 如果使用 select/poll 模型的话,那么就相当于调用select/poll
                    // 如果使用epoll,就相当于调用epoll_wait
                    while (selector.select(50) > 0) {
                        // 获取有状态的fd
                        // poll的话就是传一堆fd过去,筛选出有状态的fd
                        // epoll就是返回链表中的fd
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        // 迭代处理每一个fd
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            // 每处理一个fd就从集合中remove掉
                            iterator.remove();
                            if (selectionKey.isAcceptable()) {
                                // 是可接收的fd
                                acceptHandler(selectionKey);
                            } else if (selectionKey.isReadable()) {
                                // 是可读的fd
                                readHandler(selectionKey);
                            }
    //                        else if (selectionKey.isWritable()) {
    //                            暂时先不考虑此状态,但是要知道有这个状态
    //                        }
                        }
                    }
                } catch (Exception ignore) {
                }
            }
        }
    
        public void acceptHandler(SelectionKey selectionKey) throws IOException {
            ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
            SocketChannel client = ssc.accept();
            client.configureBlocking(false);
    
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
            // 将这个可读的fd注册到红黑树中
            // epoll_ctl( fd, EPOLL_CTL_ADD...
            client.register(selector, SelectionKey.OP_READ, buffer);
    
            System.out.println("新客户端:" + client.getRemoteAddress());
        }
    
        public void readHandler(SelectionKey selectionKey) throws IOException {
            SocketChannel client = (SocketChannel) selectionKey.channel();
            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
            buffer.clear();
    
            while (true) {
                int read = client.read(buffer);
                System.out.println("read = " + read);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        // 读到什么东西就写回去什么
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                }
            }
        }
    
        public static void main(String[] args) {
            SingletonMultiplexingIO smio = new SingletonMultiplexingIO();
            smio.start();
        }
    }
    

    来看上面这种单线程的多路复用器代码,这个版本的实现只有一个线程,也就是说所有的fd都是在一个线程中处理的,这样如果有一个线程执行时间耗时较长的话(在readHandler中处理时间过长)就会导致其他的fd都会被阻塞住往后延迟。因此我们可以很自然的想到,在处理读写事件时,使用多线程去处理。也就是下面这种方式:

    基于多线程的多路复用器V1(version 1)
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/22
     */
    public class MultiplexingIOThreads {
    
        /**
         * 线程池,处理事件
         */
        private ExecutorService executor = Executors.newFixedThreadPool(5);
    
        private Selector selector;
    
        /**
         * 初始化
         */
        private void init() {
            try {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(9000), 20);
                ssc.configureBlocking(false);
    
                this.selector = Selector.open();
                ssc.register(this.selector, SelectionKey.OP_ACCEPT);
            } catch (IOException ignore) {
            }
        }
    
        private void start() {
            this.init();
    
            try {
                while (true) {
                    while (selector.select(50) > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> ski = selectionKeys.iterator();
                        while (ski.hasNext()) {
                            SelectionKey selectionKey = ski.next();
                            ski.remove();
    
                            if (selectionKey.isAcceptable()) {
                                acceptHandler(selectionKey);
                            } else if (selectionKey.isReadable()) {
                              // selectionKey.cancel() 取消key
                                selectionKey.cancel();
                                System.out.println("**selectionKey cancel**");
                                readHandler(selectionKey);
                            } else if (selectionKey.isWritable()) {
    
                                selectionKey.cancel();
                                writeHandler(selectionKey);
                            }
                        }
                    }
                }
            } catch (Exception ignore) {
            }
        }
    
        private void writeHandler(SelectionKey selectionKey) {
            executor.execute(() -> {
                System.out.println("writeHandler ...");
                SocketChannel sc = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.flip();
                try {
                    sc.write(buffer);
                } catch (IOException ignore) {
    
                }
                
                // 数据写完,将此fd重新置为可读状态
                buffer.clear();
                try {
                    sc.register(selector, SelectionKey.OP_READ, buffer);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
        }
    
        private void readHandler(SelectionKey selectionKey) {
            // 另起一个线程处理读事件
            executor.execute(() -> {
                System.out.println("readHandler ...");
                SocketChannel sc = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.clear();
    
                try {
                    while (true) {
                        // 可以读到数据
                        if (sc.read(buffer) > 0) {
                            // 反转buffer
                            buffer.flip();
                            // 读客户端发送的数据
                            byte[] bytes = new byte[buffer.limit()];
                            buffer.get(bytes);
                            System.out.println("client write: = " + new String(bytes));
    
                            buffer.clear();
                            buffer.put("successful".getBytes());
                            // 读取完数据后,将此链接注册为可写事件
                            sc.register(selector, SelectionKey.OP_WRITE, buffer);
                        } else {
                            break;
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    
        private void acceptHandler(SelectionKey selectionKey) throws IOException {
    
            ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
    
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
            sc.register(selector, SelectionKey.OP_READ, buffer);
        }
    
        public static void main(String[] args) {
            MultiplexingIOThreads instance = new MultiplexingIOThreads();
            instance.start();
        }
    }
    

    追踪启动上面的进程,然后使用nc连接服务端,主要是为了查看selectionKey.cancel()这个方法的系统调用:

    # readhandler开辟新线程
    clone(child_stack=0x7f5d048d3fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f5d048d49d0, tls=0x7f5d048d4700, child_tidptr=0x7f5d048d49d0) = 2
    7579
    futex(0x7f5d1404c354, FUTEX_WAIT_PRIVATE, 39, NULL) = 0
    futex(0x7f5d1404c328, FUTEX_WAIT_PRIVATE, 2, NULL) = 0
    futex(0x7f5d1404c328, FUTEX_WAKE_PRIVATE, 1) = 0
    # 下一次执行epoll_wait即select()前先执行删除key操作
    epoll_ctl(7, EPOLL_CTL_DEL, 8, {0, {u32=8, u64=1958317697552875528}}) = 0
    futex(0x7f5d14107154, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x7f5d14107150, {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
    futex(0x7f5d14107128, FUTEX_WAKE_PRIVATE, 1) = 0
    # 执行select()
    epoll_wait(7, {}, 8192, 50) 
    

    关于selectionKey.cancel()方法:

    cancel()方法被调用时,会在下一次调用select()方法时将key从红黑树中删除,通过epoll_ctl(7, EPOLL_CTL_DELETE....)删除。

    为什么要调用cancel()

    在之前的版本中,数据的输入和输出操作都是在同一个线程一个方法中进行,我们读完数据立即就执行了写操作,因此当一个方法执行完时,就代表这个key中的数据已经被读完了,但是现在使用多线程处理读写操作,因此可能出现的情况就是:当启动了一个子线程,但是因为需要读的数据非常大,比较耗时,子线程还在读数据是,主线程进入了下一次循环,仍然找到了这个fd,那么就会重新开劈子进程,重新读取fd数据,出现重复读数据的情况。因此当使用这种多线程读时,就需要调用cancel()去删除这个key,这样下次就不会再读到它了。

    但是因为

    1. 线程池大小难控制,多线程间线程切换会影响效率,多线程会有线程安全性问题

    2. cancel()实际调用的是epoll_ctl(DELETE)也是属于系统调用,IO模型的进化 BIO->NIO->多路复用,本就是为了减少 系统调用,因此这种方式违背了初心,不可取。

    所以就进化出了新的多线程模型。

    基于多线程的多路复用器V2

    由于上面版本的多线程的问题,提出的解决方案就是:

    开辟多个selector,每一个selector内部是单线程的,多个selector是并行的。同时利用了多线程和单线程的优点。

    代码实现:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/30
     */
    public class MultiplexingIOThreads_V2 {
        
        private Selector selector1;
        private Selector selector2;
        private Selector selector3;
    
        void init() {
    
            try {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(9000));
                ssc.configureBlocking(false);
    
                selector1 = Selector.open();
                selector2 = Selector.open();
                selector3 = Selector.open();
                
                ssc.register(selector1, SelectionKey.OP_ACCEPT);
                ssc.register(selector2, SelectionKey.OP_ACCEPT);
                ssc.register(selector3, SelectionKey.OP_ACCEPT);
    
            } catch (IOException ignore) {
            }
        }
    
        public static void main(String[] args) {
            MultiplexingIOThreads_V2 mt_v2 = new MultiplexingIOThreads_V2();
            mt_v2.init();
    
            new SelectorThread(mt_v2.selector1, "selector1").start();
            new SelectorThread(mt_v2.selector2, "selector2").start();
            new SelectorThread(mt_v2.selector3, "selector3").start();
        }
    
        /**
         * 每一个selector对应一个单线程
         */
        static class SelectorThread extends Thread{
    
            private Selector selector;
    
            public SelectorThread(Selector selector, String threadName) {
                super(threadName);
                this.selector = selector;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        while (selector.select(50) > 0) {
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> ski = selectionKeys.iterator();
                            while (ski.hasNext()) {
                                SelectionKey selectionKey = ski.next();
                                ski.remove();
    
                                if (selectionKey.isAcceptable()) {
                                    acceptHandler(selectionKey);
                                } else if (selectionKey.isReadable()) {
                                    readHandler(selectionKey);
                                }
                            }
                        }
                    }
                } catch (Exception ignore) {
                }
            }
    
            public void acceptHandler(SelectionKey selectionKey) throws IOException {
                ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                SocketChannel client = ssc.accept();
                client.configureBlocking(false);
    
                ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
                // 将这个可读的fd注册到红黑树中
                // epoll_ctl( fd, EPOLL_CTL_ADD...
                client.register(selector, SelectionKey.OP_READ, buffer);
    
                System.out.println(this.getName() + "新客户端:" + client.getRemoteAddress());
            }
    
            public void readHandler(SelectionKey selectionKey) throws IOException {
                SocketChannel client = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.clear();
    
                while (true) {
                    int read = client.read(buffer);
                    System.out.println("read = " + read);
                    if (read > 0) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            // 读到什么东西就写回去什么
                            client.write(buffer);
                        }
                        buffer.clear();
                    } else {
                        break;
                    }
                }
            }
        }
    }
    

    首先看这一版本的多线程实现。多个selector线程共同处理客户端的连接请求。通过前面的C10K去验证此服务端。会发现一个问题:

    因为多个线程的优先级是一样的,因此在客户端连接进来时,是随机分配给每个selector的,一个是不可控问题,可能有的selector注册的特别多,特别忙,有的selector却很闲。我们希望是多个selector协同处理所有的fd。因此我们做一下改动:

    package com.gouxiazhi.io;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/30
     */
    public class MultiplexingIOThreads_V3 {
    
        private Selector selector1;
    
        void init() {
    
            try {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(9000));
                ssc.configureBlocking(false);
    
                selector1 = Selector.open();
    
                ssc.register(selector1, SelectionKey.OP_ACCEPT);
    
            } catch (IOException ignore) {
            }
        }
    
        public static void main(String[] args) throws IOException {
            MultiplexingIOThreads_V3 mt_v3 = new MultiplexingIOThreads_V3();
            mt_v3.init();
    
            new SelectorThread("boss", 2, mt_v3.selector1).start();
            new SelectorThread("worker1").start();
            new SelectorThread("worker2").start();
        }
    
        /**
         * 每一个selector对应一个单线程
         */
        static class SelectorThread extends Thread {
    
            /**
             * 统计当前boss所接收的客户端总数
             */
            private static AtomicInteger idx = new AtomicInteger();
            /**
             * 下边有几个小弟
             */
            private static int workers;
            /**
             * 一个小弟分一个任务链
             */
            private volatile static LinkedBlockingQueue<SocketChannel>[] queues;
            /**
             * 干活的小弟
             */
            private Selector selector;
            /**
             * 小弟编号
             */
            private int id;
    
            /**
             * boss线程构造器
             *
             * @param name     线程名
             * @param workers  小弟数量
             * @param selector boss
             */
            public SelectorThread(String name, int workers, Selector selector) {
                super(name);
                this.selector = selector;
                SelectorThread.workers = workers;
                queues = new LinkedBlockingQueue[workers];
                for (int i = 0; i < workers; i++) {
                    queues[i] = new LinkedBlockingQueue<>();
                }
            }
    
            public SelectorThread(String name) throws IOException {
                super(name);
                this.selector = Selector.open();
                this.id = idx.getAndIncrement() % workers;
            }
    
    
            @Override
            public void run() {
                System.out.println(this.getName() + "start running");
                try {
                    while (true) {
                        // 收集key
                        while (selector.select(50) > 0) {
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> ski = selectionKeys.iterator();
                            while (ski.hasNext()) {
                                SelectionKey selectionKey = ski.next();
                                ski.remove();
    
                                // boss线程会出现
                                if (selectionKey.isAcceptable()) {
                                    acceptHandler(selectionKey);
                                }
                                // worker线程会出现
                                else if (selectionKey.isReadable()) {
                                    readHandler(selectionKey);
                                }
                            }
                        }
    
                        if (!queues[id].isEmpty()) {
                            SocketChannel sc = queues[id].take();
                            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
                            sc.register(this.selector, SelectionKey.OP_READ, byteBuffer);
                            System.out.println(this.getName() + "新客户端:" + sc.getRemoteAddress());
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public void acceptHandler(SelectionKey selectionKey) throws IOException {
                ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                SocketChannel client = ssc.accept();
                client.configureBlocking(false);
    
                // 给小弟的链表添加工作任务
                int num = idx.getAndIncrement() % workers;
                queues[num].add(client);
            }
    
            public void readHandler(SelectionKey selectionKey) throws IOException {
                SocketChannel client = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.clear();
    
                try {
                    while (true) {
                        int read = client.read(buffer);
                        System.out.println(this.getName() + "read = " + read);
                        if (read > 0) {
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                // 读到什么东西就写回去什么
                                client.write(buffer);
                            }
                            buffer.clear();
                        } else {
                            break;
                        }
                    }
                } catch (Exception ignore) {
    
                }
            }
        }
    }
    

    在上面这个版本中我们实现了多线程协作处理,其实已经有了netty的雏形,IO笔记至此完。

  • 相关阅读:
    线段树专辑—— pku 1436 Horizontally Visible Segments
    线段树专辑——pku 3667 Hotel
    线段树专辑——hdu 1540 Tunnel Warfare
    线段树专辑—— hdu 1828 Picture
    线段树专辑—— hdu 1542 Atlantis
    线段树专辑 —— pku 2482 Stars in Your Window
    线段树专辑 —— pku 3225 Help with Intervals
    线段树专辑—— hdu 1255 覆盖的面积
    线段树专辑—— hdu 3016 Man Down
    Ajax跨域访问
  • 原文地址:https://www.cnblogs.com/Zs-book1/p/14957387.html
Copyright © 2011-2022 走看看