zoukankan      html  css  js  c++  java
  • NIO使用Reactor模式遇到的问题

    关于Reactor模式,不再多做介绍,推荐Doug Lea大神的教程:Java 可扩展的IO

    本来在Reactor的构造方法中完成一系列操作是没有问题的:

    public class Reactor implements Runnable {
    
        private final Selector selector;
    
        public Reactor() throws IOException {
            selector = Selector.open();
            String host = "127.0.0.1";
            int port = 12345;
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.socket().bind(new InetSocketAddress(host, port));
            serverChannel.configureBlocking(false);
            serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel, selector));
        }
    
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        dispatch(it.next());
                    }
                    selected.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void dispatch(SelectionKey next) {
            Runnable r = (Runnable) next.attachment();
            if (r != null) {
                r.run();
            }
        }
    
        public void registerChannel(SelectableChannel channel, int ops) throws IOException {
            if (channel instanceof ServerSocketChannel) {
                ServerSocketChannel socketChannel = (ServerSocketChannel) channel;
                channel.register(selector, ops, new Acceptor(socketChannel, selector));
            }
        }
    }

    然而有些参数需要在外层操作,我想这样弄:

    public Reactor() throws IOException {
        selector = Selector.open();
    }
    

    在主线程中启动server [reactorManager.start()来启动Reactor线程]

    private void startReactorManager() throws IOException {
        reactorManager = new ReactorManager();
        reactorManager.start();
    }
    
    private void startNIOServer(ServerConfig serverConfig) throws IOException {
        String host = serverConfig.getHost();
        int port = serverConfig.getTcpPort();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(host, port));
        serverChannel.configureBlocking(false);
        reactorManager.registerChannel(serverChannel, SelectionKey.OP_ACCEPT);
    }

    也就是先启动一个线程来Selector.open();然后在主线程中注册通道和事件。

    结果一直无法监听到客户端的连接,跟踪才发现服务端的注册方法阻塞了,原因是锁的问题,具体还不清楚。

    这里留下一个疑问

    想一想,既然在构造函数中可以注册,放到main线程中却不行,那么是否我们可以在注册时检查,如果this是当前Reactor线程,就直接注册,这跟在构造函数中没有区别。

    如果不是,就放到队列中,当然你这时放了,selector.select()一直阻塞着,你也无法取出来注册,那么我们可以利用selector.wakeup()唤醒它。

    新的方案如下:

    public class Reactor extends Thread {
    
        private final Selector selector;
        private LinkedBlockingQueue<Object[]> register = new LinkedBlockingQueue<>() ;//channel、ops、attach
        private final AtomicBoolean wakeup = new AtomicBoolean() ;
    
        public Reactor() throws IOException {
            selector = Selector.open();
        }
    
        @Override
        public void run() {
            while (!Thread.interrupted()) {
                try {
                    wakeup.set(false);
                    processRegister();
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator<SelectionKey> it = selected.iterator();
                    while (it.hasNext()) {
                        dispatch(it.next());
                    }
                    selected.clear();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void processRegister() {
            Object[] object;
            while ((object = this.register.poll()) != null) {
                try {
                    SelectableChannel channel = (SelectableChannel) object[0];
                    if (!channel.isOpen())
                        continue;
                    int ops = ((Integer) object[1]).intValue();
                    Object attachment = object[2];
                    channel.register(this.selector, ops, attachment);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void dispatch(SelectionKey next) {
            Runnable r = (Runnable) next.attachment();
            if (r != null) {
                r.run();
            }
        }
    
        public void registerChannel(SelectableChannel channel, int ops) throws IOException {
            ServerSocketChannel serverChannel = null;
            if (channel instanceof ServerSocketChannel) {
                serverChannel = (ServerSocketChannel) channel;
            }
            Object attachment = new Acceptor(serverChannel, selector);
            if (this == Thread.currentThread()) {
                serverChannel.register(selector, ops, attachment);
            } else {
                this.register.offer(new Object[]{ channel, ops, attachment });
                if (wakeup.compareAndSet(false, true)) {
                    this.selector.wakeup();
                }
            }
        }
    }

    这样就解决了问题,在此记录下!

    ================================== 赵客缦胡缨,吴钩霜雪明。 银鞍照白马,飒沓如流星。 ==================================
  • 相关阅读:
    关于 Python 对象拷贝的那点事?
    痞子衡嵌入式:嵌入式从业者应知应会知识点
    痞子衡嵌入式:高性能MCU之人工智能物联网应用开发那些事
    痞子衡嵌入式:恩智浦i.MX RTxxx系列MCU特性那些事(2)- RT685SFVK性能实测(Dhrystone)
    痞子衡嵌入式:微处理器CPU性能测试基准(Dhrystone)
    痞子衡嵌入式:如果你正在量产i.MX RT产品,不妨试试这款神器RT-Flash
    痞子衡嵌入式:飞思卡尔i.MX RT系列MCU量产神器RT-Flash常见问题
    痞子衡嵌入式:飞思卡尔i.MX RT系列MCU量产神器RT-Flash用户指南
    痞子衡嵌入式:如果i.MX RT是一匹悍马,征服它时别忘了用马镫MCUBootUtility
    痞子衡嵌入式:超级好用的可视化PyQt GUI构建工具(Qt Designer)
  • 原文地址:https://www.cnblogs.com/lucare/p/9312656.html
Copyright © 2011-2022 走看看