zoukankan      html  css  js  c++  java
  • NIO使用Reactor模式遇到的问题

    关于Reactor模式,不再多做介绍,推荐Doug Lea大神的教程:Java 可扩展的IO

    本来在Reactor的构造方法中完成一系列操作是没有问题的:

    public class Reactor implements Runnable {
    
        private final Selector selector;
    
        public Reactor() throws IOException {
            selector = Selector.open();
            String host = "127.0.0.1";
            int port = 12345;
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.socket().bind(new InetSocketAddress(host, port));
            serverChannel.configureBlocking(false);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel, selector));
        }
    
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        dispatch(it.next());
                    }
                    selected.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void dispatch(SelectionKey next) {
            Runnable r = (Runnable) next.attachment();
            if (r != null) {
                r.run();
            }
        }
    
        public void registerChannel(SelectableChannel channel, int ops) throws IOException {
            if (channel instanceof ServerSocketChannel) {
                ServerSocketChannel socketChannel = (ServerSocketChannel) channel;
                channel.register(selector, ops, new Acceptor(socketChannel, selector));
            }
        }
    }

    然而有些参数需要在外层操作,我想这样弄:

    public Reactor() throws IOException {
        selector = Selector.open();
    }
    

    在主线程中启动server [reactorManager.start()来启动Reactor线程]

    private void startReactorManager() throws IOException {
        reactorManager = new ReactorManager();
        reactorManager.start();
    }
    
    private void startNIOServer(ServerConfig serverConfig) throws IOException {
        String host = serverConfig.getHost();
        int port = serverConfig.getTcpPort();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(host, port));
        serverChannel.configureBlocking(false);
        reactorManager.registerChannel(serverChannel, SelectionKey.OP_ACCEPT);
    }

    也就是先启动一个线程来Selector.open();然后在主线程中注册通道和事件。

    结果一直无法监听到客户端的连接,跟踪才发现服务端的注册方法阻塞了,原因是锁的问题,具体还不清楚。

    这里留下一个疑问

    想一想,既然在构造函数中可以注册,放到main线程中却不行,那么是否我们可以在注册时检查,如果this是当前Reactor线程,就直接注册,这跟在构造函数中没有区别。

    如果不是,就放到队列中,当然你这时放了,selector.select()一直阻塞着,你也无法取出来注册,那么我们可以利用selector.wakeup()唤醒它。

    新的方案如下:

    public class Reactor extends Thread {
    
        private final Selector selector;
        private LinkedBlockingQueue<Object[]> register = new LinkedBlockingQueue<>() ;//channel、ops、attach
        private final AtomicBoolean wakeup = new AtomicBoolean() ;
    
        public Reactor() throws IOException {
            selector = Selector.open();
        }
    
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    wakeup.set(false);
                    processRegister();
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        dispatch(it.next());
                    }
                    selected.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void processRegister() {
            Object[] object;
            while ((object = this.register.poll()) != null) {
                try {
                    SelectableChannel channel = (SelectableChannel) object[0];
                    if (!channel.isOpen())
                        continue;
                    int ops = ((Integer) object[1]).intValue();
                    Object attachment = object[2];
                    channel.register(this.selector, ops, attachment);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void dispatch(SelectionKey next) {
            Runnable r = (Runnable) next.attachment();
            if (r != null) {
                r.run();
            }
        }
    
        public void registerChannel(SelectableChannel channel, int ops) throws IOException {
            ServerSocketChannel serverChannel = null;
            if (channel instanceof ServerSocketChannel) {
                serverChannel = (ServerSocketChannel) channel;
            }
            Object attachment = new Acceptor(serverChannel, selector);
            if (this == Thread.currentThread()) {
                serverChannel.register(selector, ops, attachment);
            } else {
                this.register.offer(new Object[]{ channel, ops, attachment });
                if (wakeup.compareAndSet(false, true)) {
                    this.selector.wakeup();
                }
            }
        }
    }

    这样就解决了问题,在此记录下!

    ================================== 赵客缦胡缨,吴钩霜雪明。 银鞍照白马,飒沓如流星。 ==================================
  • 相关阅读:
    亿级 Web 系统搭建:单机到分布式集群
    机器学习14种常见算法
    Nginx——使用 Nginx 提升网站访问速度【转载+整理】
    全栈开发工程师,就是个神话~【转载+整理】
    谷歌、亚马逊相继宣布屏蔽 Flash 广告,又一个时代行将结束?【转载+整理】
    前端框架现状调查【转载+整理】
    Swift 编程语言【转载+整理】
    如何将 Java 项目转换成 Maven 项目
    Eclipse 4.5.0 离线安装 Veloeclipse 插件
    Java 8 新特性——Lambdas 表达式
  • 原文地址:https://www.cnblogs.com/lucare/p/9312656.html
Copyright © 2011-2022 走看看