zoukankan      html  css  js  c++  java
  • 多路多线程 reactor 模型的实现

    /**
     * @Author Niuxy
     * @Date 2020/6/10 9:42 下午
     * @Description 多 selector 多线程的 NIO 服务端
     * 使用 NIO 时一定要摒弃 BIO 的阻塞思维,我们的代码面向的是事件,而不是连接
     * 至于多次事件完成一个连接的情况,我们可以通过 attachment 记录该连接上次事件处理的结果。
     * 上面做法的前提是一个连接只允许注册一个感兴趣的事件。
     */
    public class CurrentReactor implements Runnable {
        // CPU 核心数
        int cpuNums = Runtime.getRuntime().availableProcessors() + 3;
        // selector 数
        int selectorNums;
        // 监听读写事件的循环
        Selector[] selectorArr;
        // 事件处理线程池
        ExecutorService executorService;
        ServerSocketChannel serverSocketChannel;
        //当前使用的 selector 坐标
        Integer currentSelector;
    
    
        CurrentReactor(ServerSocketChannel serverSocketChannel) {
            this.serverSocketChannel = serverSocketChannel;
            //读写事件从 1 开始使用,第一个 selector 用于监听连接事件
            currentSelector = 1;
            try {
                this.selectorNums = 3;
                selectorArr = new Selector[3];
                // 四核以上服务器较好
                executorService = Executors.newFixedThreadPool(cpuNums + selectorNums);
                for (int i = 0; i < selectorNums; i++) {
                    selectorArr[i] = SelectorProvider.provider().openSelector();
                }
                // 注册 server 连接事件
                SelectionKey key = serverSocketChannel.register(selectorArr[0], SelectionKey.OP_ACCEPT);
                key.attach(new Acceptor());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        //事件分发器
        private void dispatch(SelectionKey key) {
            if (key == null || key.attachment() == null) {
                return;
            }
            try {
                executorService.execute((Runnable) key.attachment());
            } catch (Exception e) {
                //任务提交异常则什么都不做,因为 NIO 的水平触发机制会继续触发事件
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            for (int i = 0; i < 3; i++) {
                Selector selector = selectorArr[i];
                executorService.execute(
                        () -> {
                            while (!Thread.currentThread().isInterrupted()) {
                                try {
                                    selector.select(100);
                                    Set<SelectionKey> keys = selector.selectedKeys();
                                    Iterator iterator = keys.iterator();
                                    while (iterator.hasNext()) {
                                        SelectionKey key = (SelectionKey) iterator.next();
                                        iterator.remove();
                                        dispatch(key);
                                    }
                                } catch (Exception e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                );
            }
        }
    
        /**
         * @Author Niuxy
         * @Date 2020/6/10 9:01 下午
         * @Description 连接事件处理器
         */
        class Acceptor implements Runnable {
            Selector handlerSelector;
    
            Acceptor() {
                // 采用轮询的负载均衡策略选取 selector
                synchronized (currentSelector) {
                    if (currentSelector == 3) {
                        currentSelector = 1;
                    }
                    this.handlerSelector = selectorArr[currentSelector];
                    currentSelector++;
                }
            }
    
            @Override
            public void run() {
                try {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if (socketChannel == null) {
                        return;
                    }
                    socketChannel.configureBlocking(false);
                    SelectionKey key = socketChannel.register(this.handlerSelector, SelectionKey.OP_READ);
                    //报文接收策略,与连接一一绑定
                    MessageHandler messageHandler = new PrintMessageHandlerImpl();
                    // register 与 select 方法竞争锁,防止 register 被 select 阻塞
                    this.handlerSelector.wakeup();
                    ReciveRegister reciveRegister = new HLRegisterImpl(2, messageHandler);
                    //注册 key 的同时 将事件处理的 "回调" 函数绑定到 key 上
                    key.attach(new Handler(socketChannel, key, reciveRegister));
                } catch (ClosedChannelException ce) {
                    ce.printStackTrace();
                    //to do
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * @Author Niuxy
         * @Date 2020/6/10 8:50 下午
         * @Description 就绪读写事件处理器。粗暴的将方法全部加锁,一个连接不应该有多个线程同时处理,但
         * Reactor 模式下不同的事件提交线程池后可能造成多个线程处理同一个链接
         * 不能依靠 key 的 Readable 或 Writeable 状态决定当前是读是写,读写应当由完整的请求进行分割,一读一写,再处理下次请求
         * 因此需要自己维护读写状态位
         */
        class Handler implements Runnable {
            public static final int READING = 0, WRITING = 1;
            volatile SocketChannel socketChannel;
            volatile SelectionKey key;
            /**
             * @Author Niuxy
             * @Date 2020/6/2 9:29 下午
             * @Description 在响应上一个请求前,我们不希望处理下一个请求,因此在 Handler 维护一个状态位,标识目前应当
             * 处理读事件还是写事件
             * 我们必须保证接收和回复的顺序性,保证客户端可以对响应做出正确的处理
             * 当然也有其它的处理方式,我们将响应数据装入一个有序队列,并顺序的处理这些响应。或者通过令牌将请求和响应
             * 进行对应。
             */
            int state = READING;
            ReciveRegister reciveRegister;
            String readResult = null;
            ByteBuffer writeBuffer = null;
    
            Handler(SocketChannel channel, SelectionKey key, ReciveRegister reciveRegister) {
                /**
                 * @Author Niuxy
                 * @Date 2020/6/4 9:39 下午
                 * @Description 重要!必须保证构造方法与其它方法的互斥
                 *              否则可能造成构造方法没有执行完,其它线程已开始执行该对象的其它方法
                 *              场景不容易复现,但在大剂量请求到达时 Reactor 模式中时很容易出现
                 */
                synchronized (this) {
                    this.socketChannel = channel;
                    this.key = key;
                    this.reciveRegister = reciveRegister;
                }
            }
    
            @Override
            public synchronized void run() {
                try {
                    if (state == READING) {
                        read();
                    } else {
                        write();
                    }
                } catch (RuntimeException rex) {
                    throw rex;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            private synchronized void read() throws Exception {
                String re = reciveRegister.doRecive(socketChannel);
                if (re != null && re != "") {
                    readResult = re;
                    state = WRITING;
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }
    
            private synchronized void write() throws IOException {
                if (this.readResult == null || readResult == "") {
                    return;
                }
                //如果不是第一次触发写事件,接着上次的写
                if (writeBuffer == null) {
                    writeBuffer = ByteBuffer.wrap(this.readResult.getBytes());
                }
    
                //该循环处理发送缓冲区处理速度小于网卡发送速度,无法一次性将 buffer 中数据写入发送缓冲区的情况
                socketChannel.write(writeBuffer);
                if (writeBuffer.position() != writeBuffer.limit()) {
                    return;
                }
                writeBuffer = null;
                readResult = null;
                state = READING;
                //写完将兴趣移除,否则会将 CPU 跑满
                key.interestOps(SelectionKey.OP_READ);
            }
        }
    }
  • 相关阅读:
    模式识别之svm()---支持向量机svm 简介1995
    机器视觉之相关资源--- 计算机视觉相关的部分测试数据集和源码站点
    包的引入,不要引入自己目录下的。
    内省对象 用的少,被BeanUtils代替
    使用增强for循环遍历集合的时候操作集合的问题?
    Integer 内部实现
    eclipse常用快捷键
    java 运行时环境和编译器环境
    js 随机变换图片
    js 事件点击 显示 隐藏
  • 原文地址:https://www.cnblogs.com/niuyourou/p/13113685.html
Copyright © 2011-2022 走看看