zoukankan      html  css  js  c++  java
  • IO学习笔记7

    2.4 多路复用javaAPI

    在上面我们简单java代码实现了多路复用,是一个单线程版的。讲上面的epoll代码复制到linux服务器中,使用strace追踪系统调用。

    javaAPI会根据系统类型自动选择最优的多路复用器,因此在linux下默认就是使用epoll模型,也可以通过启动参数指定多路复用器模型。

    # 因为默认是epoll,所以系统调用起名为epoll开头
    strace -ff -o epoll java SingletonMultiplexingIO
    # 再指定poll 起名为poll开头的
    strace -ff -o poll java -Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.PollSelectorProvider SingletonMultiplexingIO
    

    执行完上面两条命令后就可以得到如下文件:

    total 2160
    -rw-rw-r-- 1 xz xz   13911 Jun 22 17:13 epoll.32474
    -rw-rw-r-- 1 xz xz  526239 Jun 22 17:13 epoll.32475
    -rw-rw-r-- 1 xz xz   62970 Jun 22 17:13 epoll.32476
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 epoll.32477
    -rw-rw-r-- 1 xz xz    1157 Jun 22 17:13 epoll.32478
    -rw-rw-r-- 1 xz xz    2183 Jun 22 17:13 epoll.32479
    -rw-rw-r-- 1 xz xz   28151 Jun 22 17:13 epoll.32480
    -rw-rw-r-- 1 xz xz   20128 Jun 22 17:13 epoll.32481
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 epoll.32482
    -rw-rw-r-- 1 xz xz 1214432 Jun 22 17:13 epoll.32483
    -rw-rw-r-- 1 xz xz    1935 Jun 22 17:13 epoll.32589
    -rw-rw-r-- 1 xz xz   13661 Jun 22 17:13 poll.32595
    -rw-rw-r-- 1 xz xz    9633 Jun 22 17:13 poll.32596
    -rw-rw-r-- 1 xz xz   13954 Jun 22 17:13 poll.32602
    -rw-rw-r-- 1 xz xz  206806 Jun 22 17:13 poll.32603
    -rw-rw-r-- 1 xz xz    2121 Jun 22 17:13 poll.32604
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 poll.32605
    -rw-rw-r-- 1 xz xz    1157 Jun 22 17:13 poll.32606
    -rw-rw-r-- 1 xz xz    2342 Jun 22 17:13 poll.32607
    -rw-rw-r-- 1 xz xz    6577 Jun 22 17:13 poll.32608
    -rw-rw-r-- 1 xz xz    3897 Jun 22 17:13 poll.32609
    -rw-rw-r-- 1 xz xz    1033 Jun 22 17:13 poll.32610
    -rw-rw-r-- 1 xz xz   10674 Jun 22 17:13 poll.32611
    -rw-rw-r-- 1 xz xz    1935 Jun 22 17:13 poll.32612
    -rw-rw-r-- 1 xz xz    3317 Jun 22 17:07 SingletonMultiplexingIO.class
    -rw-rw-r-- 1 xz xz    3520 Jun 22 17:06 SingletonMultiplexingIO.java
    

    查看epoll.32475:

    ...
    # 首先创建一个socket套接字,得到文件描述符4
    socket(PF_INET, SOCK_STREAM, IPPROTO_IP) = 4
    ...
    # 绑定端口9000到文件描述符4上,并监听文件描述符4
    bind(4, {sa_family=AF_INET, sin_port=htons(9000), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
    listen(4, 20) 
    ...
    # 设置非阻塞
    fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
    ...
    # 打印创建socket成功
    write(1, "create socket 9000 successful", 29) = 29
    # 建立一个管道,占用文件描述符5,6,这两个文件描述符先不管
    ...
    pipe([5, 6])                            = 0
    ...
    # 创建epoll,得到文件描述符7 这个7就是epoll在内核空间开辟的红黑树对应的文件描述符
    epoll_create(256)                       = 7
    ...
    # 调用epoll_ctl,将5添加到epoll中,这个不管
    epoll_ctl(7, EPOLL_CTL_ADD, 5, {EPOLLIN, {u32=5, u64=4613014228473741317}}) = 0
    write(1, "create multiplexing successful", 30) = 30
    ...
    write(1, "register socket to epoll success"..., 35) = 35
    ...
    # 将4通过 epoll_ctl 添加(EPOLL_CTL_ADD)到epoll中。这里注意,我们代码中,
    # 打印: register socket to epoll success 这句话位置是在socket创建完成后,selector.select()代码执行前
    # 但是在实际的调用中,注册4到epoll中在那句话后面打印的,因此实际执行中,在第一次执行 selector.select()时才注册 serverSocket到epoll中,其实是懒加载
    epoll_ctl(7, EPOLL_CTL_ADD, 4, {EPOLLIN, {u32=4, u64=5340325089890009092}}) = 0
    # selector.select(50) 对应的就是下面的epoll_wait
    epoll_wait(7, {}, 8192, 50)             = 0
    epoll_wait(7, {}, 8192, 50)             = 0
    ...
    

    然后再看poll.32596:

    ...
    # 首先创建一个socket套接字,得到文件描述符4
    socket(PF_INET6, SOCK_STREAM, IPPROTO_IP) = 4
    ...
    # 绑定端口9000到文件描述符4上,并监听文件描述符4
    bind(4, {sa_family=AF_INET, sin_port=htons(9000), sin_addr=inet_addr("0.0.0.0")}, 16) = 0
    listen(4, 20)
    ...
    # 设置非阻塞
    fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
    ...
    write(1, "create socket 9000 successful", 29) = 29
    pipe([5, 6])                            = 0
    write(1, "create multiplexing successful", 30) = 30
    write(1, "register socket to epoll success"..., 35) = 35
    poll([{fd=5, events=POLLIN}, {fd=4, events=POLLIN}], 2, 50) = 0 (Timeout)
    

    通过上面两个文件,可以很详细的看出两种方式的系统调用的区别。

    然后我们来看如何使用java代码实现多路复用器:

    单线程多路复用器

    也就是上面我们做测试用的多路复用器:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/15
     */
    public class SingletonMultiplexingIO {
    
        private Selector selector;
    
        public void init() {
            try {
                // 创建服务端 socket 系统调用
                // socket(...)  = 4
                ServerSocketChannel ssc = ServerSocketChannel.open();
                // bind(4, 9000)
                // listen(4, 20)
                ssc.bind(new InetSocketAddress(9000), 20);
                // 设置为非阻塞
                // fcntl(4, F_SETFL, O_RDWR|O_NONBLOCK)    = 0
                ssc.configureBlocking(false);
                System.out.println("create socket 9000 successful");
    
                // 创建多路复用器, 如果是epoll 则相当于调用 epoll_create
                selector = Selector.open();
                System.out.println("create multiplexing successful");
                // 向多路复用器注册fd
                // 代码虽然写在这里,但是实际调用是在下面第一次调用 selector.select()
                // 原因是懒加载
                ssc.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("register socket to epoll successful");
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void start() {
            init();
            System.out.println("服务端启动了");
            while (true) {
                try {
                    // selector.select()
                    // 如果使用 select/poll 模型的话,那么就相当于调用select/poll
                    // 如果使用epoll,就相当于调用epoll_wait
                    while (selector.select(50) > 0) {
                        // 获取有状态的fd
                        // poll的话就是传一堆fd过去,筛选出有状态的fd
                        // epoll就是返回链表中的fd
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> iterator = selectionKeys.iterator();
                        // 迭代处理每一个fd
                        while (iterator.hasNext()) {
                            SelectionKey selectionKey = iterator.next();
                            // 每处理一个fd就从集合中remove掉
                            iterator.remove();
                            if (selectionKey.isAcceptable()) {
                                // 是可接收的fd
                                acceptHandler(selectionKey);
                            } else if (selectionKey.isReadable()) {
                                // 是可读的fd
                                readHandler(selectionKey);
                            }
    //                        else if (selectionKey.isWritable()) {
    //                            暂时先不考虑此状态,但是要知道有这个状态
    //                        }
                        }
                    }
                } catch (Exception ignore) {
                }
            }
        }
    
        public void acceptHandler(SelectionKey selectionKey) throws IOException {
            ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
            SocketChannel client = ssc.accept();
            client.configureBlocking(false);
    
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
            // 将这个可读的fd注册到红黑树中
            // epoll_ctl( fd, EPOLL_CTL_ADD...
            client.register(selector, SelectionKey.OP_READ, buffer);
    
            System.out.println("新客户端:" + client.getRemoteAddress());
        }
    
        public void readHandler(SelectionKey selectionKey) throws IOException {
            SocketChannel client = (SocketChannel) selectionKey.channel();
            ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
            buffer.clear();
    
            while (true) {
                int read = client.read(buffer);
                System.out.println("read = " + read);
                if (read > 0) {
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        // 读到什么东西就写回去什么
                        client.write(buffer);
                    }
                    buffer.clear();
                } else if (read == 0) {
                    break;
                } else {
                    client.close();
                }
            }
        }
    
        public static void main(String[] args) {
            SingletonMultiplexingIO smio = new SingletonMultiplexingIO();
            smio.start();
        }
    }
    

    来看上面这种单线程的多路复用器代码,这个版本的实现只有一个线程,也就是说所有的fd都是在一个线程中处理的,这样如果有一个线程执行时间耗时较长的话(在readHandler中处理时间过长)就会导致其他的fd都会被阻塞住往后延迟。因此我们可以很自然的想到,在处理读写事件时,使用多线程去处理。也就是下面这种方式:

    基于多线程的多路复用器V1(version 1)
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/22
     */
    public class MultiplexingIOThreads {
    
        /**
         * 线程池,处理事件
         */
        private ExecutorService executor = Executors.newFixedThreadPool(5);
    
        private Selector selector;
    
        /**
         * 初始化
         */
        private void init() {
            try {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(9000), 20);
                ssc.configureBlocking(false);
    
                this.selector = Selector.open();
                ssc.register(this.selector, SelectionKey.OP_ACCEPT);
            } catch (IOException ignore) {
            }
        }
    
        private void start() {
            this.init();
    
            try {
                while (true) {
                    while (selector.select(50) > 0) {
                        Set<SelectionKey> selectionKeys = selector.selectedKeys();
                        Iterator<SelectionKey> ski = selectionKeys.iterator();
                        while (ski.hasNext()) {
                            SelectionKey selectionKey = ski.next();
                            ski.remove();
    
                            if (selectionKey.isAcceptable()) {
                                acceptHandler(selectionKey);
                            } else if (selectionKey.isReadable()) {
                              // selectionKey.cancel() 取消key
                                selectionKey.cancel();
                                System.out.println("**selectionKey cancel**");
                                readHandler(selectionKey);
                            } else if (selectionKey.isWritable()) {
    
                                selectionKey.cancel();
                                writeHandler(selectionKey);
                            }
                        }
                    }
                }
            } catch (Exception ignore) {
            }
        }
    
        private void writeHandler(SelectionKey selectionKey) {
            executor.execute(() -> {
                System.out.println("writeHandler ...");
                SocketChannel sc = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.flip();
                try {
                    sc.write(buffer);
                } catch (IOException ignore) {
    
                }
                
                // 数据写完,将此fd重新置为可读状态
                buffer.clear();
                try {
                    sc.register(selector, SelectionKey.OP_READ, buffer);
                } catch (ClosedChannelException e) {
                    e.printStackTrace();
                }
            });
        }
    
        private void readHandler(SelectionKey selectionKey) {
            // 另起一个线程处理读事件
            executor.execute(() -> {
                System.out.println("readHandler ...");
                SocketChannel sc = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.clear();
    
                try {
                    while (true) {
                        // 可以读到数据
                        if (sc.read(buffer) > 0) {
                            // 反转buffer
                            buffer.flip();
                            // 读客户端发送的数据
                            byte[] bytes = new byte[buffer.limit()];
                            buffer.get(bytes);
                            System.out.println("client write: = " + new String(bytes));
    
                            buffer.clear();
                            buffer.put("successful".getBytes());
                            // 读取完数据后,将此链接注册为可写事件
                            sc.register(selector, SelectionKey.OP_WRITE, buffer);
                        } else {
                            break;
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            });
        }
    
        private void acceptHandler(SelectionKey selectionKey) throws IOException {
    
            ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
            SocketChannel sc = ssc.accept();
            sc.configureBlocking(false);
    
            ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
            sc.register(selector, SelectionKey.OP_READ, buffer);
        }
    
        public static void main(String[] args) {
            MultiplexingIOThreads instance = new MultiplexingIOThreads();
            instance.start();
        }
    }
    

    追踪启动上面的进程,然后使用nc连接服务端,主要是为了查看selectionKey.cancel()这个方法的系统调用:

    # readhandler开辟新线程
    clone(child_stack=0x7f5d048d3fb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f5d048d49d0, tls=0x7f5d048d4700, child_tidptr=0x7f5d048d49d0) = 2
    7579
    futex(0x7f5d1404c354, FUTEX_WAIT_PRIVATE, 39, NULL) = 0
    futex(0x7f5d1404c328, FUTEX_WAIT_PRIVATE, 2, NULL) = 0
    futex(0x7f5d1404c328, FUTEX_WAKE_PRIVATE, 1) = 0
    # 下一次执行epoll_wait即select()前先执行删除key操作
    epoll_ctl(7, EPOLL_CTL_DEL, 8, {0, {u32=8, u64=1958317697552875528}}) = 0
    futex(0x7f5d14107154, FUTEX_WAKE_OP_PRIVATE, 1, 1, 0x7f5d14107150, {FUTEX_OP_SET, 0, FUTEX_OP_CMP_GT, 1}) = 1
    futex(0x7f5d14107128, FUTEX_WAKE_PRIVATE, 1) = 0
    # 执行select()
    epoll_wait(7, {}, 8192, 50) 
    

    关于selectionKey.cancel()方法:

    cancel()方法被调用时,会在下一次调用select()方法时将key从红黑树中删除,通过epoll_ctl(7, EPOLL_CTL_DELETE....)删除。

    为什么要调用cancel()

    在之前的版本中,数据的输入和输出操作都是在同一个线程一个方法中进行,我们读完数据立即就执行了写操作,因此当一个方法执行完时,就代表这个key中的数据已经被读完了,但是现在使用多线程处理读写操作,因此可能出现的情况就是:当启动了一个子线程,但是因为需要读的数据非常大,比较耗时,子线程还在读数据是,主线程进入了下一次循环,仍然找到了这个fd,那么就会重新开劈子进程,重新读取fd数据,出现重复读数据的情况。因此当使用这种多线程读时,就需要调用cancel()去删除这个key,这样下次就不会再读到它了。

    但是因为

    1. 线程池大小难控制,多线程间线程切换会影响效率,多线程会有线程安全性问题

    2. cancel()实际调用的是epoll_ctl(DELETE)也是属于系统调用,IO模型的进化 BIO->NIO->多路复用,本就是为了减少 系统调用,因此这种方式违背了初心,不可取。

    所以就进化出了新的多线程模型。

    基于多线程的多路复用器V2

    由于上面版本的多线程的问题,提出的解决方案就是:

    开辟多个selector,每一个selector内部是单线程的,多个selector是并行的。同时利用了多线程和单线程的优点。

    代码实现:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/30
     */
    public class MultiplexingIOThreads_V2 {
        
        private Selector selector1;
        private Selector selector2;
        private Selector selector3;
    
        void init() {
    
            try {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(9000));
                ssc.configureBlocking(false);
    
                selector1 = Selector.open();
                selector2 = Selector.open();
                selector3 = Selector.open();
                
                ssc.register(selector1, SelectionKey.OP_ACCEPT);
                ssc.register(selector2, SelectionKey.OP_ACCEPT);
                ssc.register(selector3, SelectionKey.OP_ACCEPT);
    
            } catch (IOException ignore) {
            }
        }
    
        public static void main(String[] args) {
            MultiplexingIOThreads_V2 mt_v2 = new MultiplexingIOThreads_V2();
            mt_v2.init();
    
            new SelectorThread(mt_v2.selector1, "selector1").start();
            new SelectorThread(mt_v2.selector2, "selector2").start();
            new SelectorThread(mt_v2.selector3, "selector3").start();
        }
    
        /**
         * 每一个selector对应一个单线程
         */
        static class SelectorThread extends Thread{
    
            private Selector selector;
    
            public SelectorThread(Selector selector, String threadName) {
                super(threadName);
                this.selector = selector;
            }
    
            @Override
            public void run() {
                try {
                    while (true) {
                        while (selector.select(50) > 0) {
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> ski = selectionKeys.iterator();
                            while (ski.hasNext()) {
                                SelectionKey selectionKey = ski.next();
                                ski.remove();
    
                                if (selectionKey.isAcceptable()) {
                                    acceptHandler(selectionKey);
                                } else if (selectionKey.isReadable()) {
                                    readHandler(selectionKey);
                                }
                            }
                        }
                    }
                } catch (Exception ignore) {
                }
            }
    
            public void acceptHandler(SelectionKey selectionKey) throws IOException {
                ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                SocketChannel client = ssc.accept();
                client.configureBlocking(false);
    
                ByteBuffer buffer = ByteBuffer.allocateDirect(4096);
                // 将这个可读的fd注册到红黑树中
                // epoll_ctl( fd, EPOLL_CTL_ADD...
                client.register(selector, SelectionKey.OP_READ, buffer);
    
                System.out.println(this.getName() + "新客户端:" + client.getRemoteAddress());
            }
    
            public void readHandler(SelectionKey selectionKey) throws IOException {
                SocketChannel client = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.clear();
    
                while (true) {
                    int read = client.read(buffer);
                    System.out.println("read = " + read);
                    if (read > 0) {
                        buffer.flip();
                        while (buffer.hasRemaining()) {
                            // 读到什么东西就写回去什么
                            client.write(buffer);
                        }
                        buffer.clear();
                    } else {
                        break;
                    }
                }
            }
        }
    }
    

    首先看这一版本的多线程实现。多个selector线程共同处理客户端的连接请求。通过前面的C10K去验证此服务端。会发现一个问题:

    因为多个线程的优先级是一样的,因此在客户端连接进来时,是随机分配给每个selector的,一个是不可控问题,可能有的selector注册的特别多,特别忙,有的selector却很闲。我们希望是多个selector协同处理所有的fd。因此我们做一下改动:

    package com.gouxiazhi.io;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.Scanner;
    import java.util.Set;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author shuai.zhao@going-link.com
     * @date 2021/6/30
     */
    public class MultiplexingIOThreads_V3 {
    
        private Selector selector1;
    
        void init() {
    
            try {
                ServerSocketChannel ssc = ServerSocketChannel.open();
                ssc.bind(new InetSocketAddress(9000));
                ssc.configureBlocking(false);
    
                selector1 = Selector.open();
    
                ssc.register(selector1, SelectionKey.OP_ACCEPT);
    
            } catch (IOException ignore) {
            }
        }
    
        public static void main(String[] args) throws IOException {
            MultiplexingIOThreads_V3 mt_v3 = new MultiplexingIOThreads_V3();
            mt_v3.init();
    
            new SelectorThread("boss", 2, mt_v3.selector1).start();
            new SelectorThread("worker1").start();
            new SelectorThread("worker2").start();
        }
    
        /**
         * 每一个selector对应一个单线程
         */
        static class SelectorThread extends Thread {
    
            /**
             * 统计当前boss所接收的客户端总数
             */
            private static AtomicInteger idx = new AtomicInteger();
            /**
             * 下边有几个小弟
             */
            private static int workers;
            /**
             * 一个小弟分一个任务链
             */
            private volatile static LinkedBlockingQueue<SocketChannel>[] queues;
            /**
             * 干活的小弟
             */
            private Selector selector;
            /**
             * 小弟编号
             */
            private int id;
    
            /**
             * boss线程构造器
             *
             * @param name     线程名
             * @param workers  小弟数量
             * @param selector boss
             */
            public SelectorThread(String name, int workers, Selector selector) {
                super(name);
                this.selector = selector;
                SelectorThread.workers = workers;
                queues = new LinkedBlockingQueue[workers];
                for (int i = 0; i < workers; i++) {
                    queues[i] = new LinkedBlockingQueue<>();
                }
            }
    
            public SelectorThread(String name) throws IOException {
                super(name);
                this.selector = Selector.open();
                this.id = idx.getAndIncrement() % workers;
            }
    
    
            @Override
            public void run() {
                System.out.println(this.getName() + "start running");
                try {
                    while (true) {
                        // 收集key
                        while (selector.select(50) > 0) {
                            Set<SelectionKey> selectionKeys = selector.selectedKeys();
                            Iterator<SelectionKey> ski = selectionKeys.iterator();
                            while (ski.hasNext()) {
                                SelectionKey selectionKey = ski.next();
                                ski.remove();
    
                                // boss线程会出现
                                if (selectionKey.isAcceptable()) {
                                    acceptHandler(selectionKey);
                                }
                                // worker线程会出现
                                else if (selectionKey.isReadable()) {
                                    readHandler(selectionKey);
                                }
                            }
                        }
    
                        if (!queues[id].isEmpty()) {
                            SocketChannel sc = queues[id].take();
                            ByteBuffer byteBuffer = ByteBuffer.allocateDirect(4096);
                            sc.register(this.selector, SelectionKey.OP_READ, byteBuffer);
                            System.out.println(this.getName() + "新客户端:" + sc.getRemoteAddress());
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            public void acceptHandler(SelectionKey selectionKey) throws IOException {
                ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                SocketChannel client = ssc.accept();
                client.configureBlocking(false);
    
                // 给小弟的链表添加工作任务
                int num = idx.getAndIncrement() % workers;
                queues[num].add(client);
            }
    
            public void readHandler(SelectionKey selectionKey) throws IOException {
                SocketChannel client = (SocketChannel) selectionKey.channel();
                ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                buffer.clear();
    
                try {
                    while (true) {
                        int read = client.read(buffer);
                        System.out.println(this.getName() + "read = " + read);
                        if (read > 0) {
                            buffer.flip();
                            while (buffer.hasRemaining()) {
                                // 读到什么东西就写回去什么
                                client.write(buffer);
                            }
                            buffer.clear();
                        } else {
                            break;
                        }
                    }
                } catch (Exception ignore) {
    
                }
            }
        }
    }
    

    在上面这个版本中我们实现了多线程协作处理,其实已经有了netty的雏形,IO笔记至此完。

  • 相关阅读:
    在python中处理XML
    正则表达式
    python 反射
    python模块概况,json/pickle,time/datetime,logging
    python装饰器
    python3内置函数详解
    python函数,lambda表达式,三目运算,列表解析,递归
    python数据类型及其常用方法
    python3的文件操作
    初识Docker
  • 原文地址:https://www.cnblogs.com/Zs-book1/p/14957387.html
Copyright © 2011-2022 走看看