zoukankan      html  css  js  c++  java
  • NIO和Reactor

    本文参考Doug Lea的Scalable IO in Java.

    网络服务

    随着网络服务的越来越多,我们对网络服务的性能有了更高的要求,提供一个高性能,稳定的web服务是一件很麻烦的事情,所以有了netty框架帮我们完成。

    我们对各种各样的网络服务进行抽象,得到最基本的业务流程:

    1:读取请求信息

    2:对请求信息进行解码

    3:进行相关的业务处理

    4:对处理结果进行编码

    5:发送请求

    看到这,netty的ChannelHandler就是干这几件事情的。

    传统的网络服务

    在jdk1.4之前,我们只有BIO,所以当时的网络服务的架构是这样的。

    每个线程处理一个请求, 由于线程个数和cpu个数的原因,这种设计性能是有上限的,所以就有了集群模式:tomcat集群。

    /**
     * Created by gaoxing on 2015-01-20 .
     */
    public class ClasssicServer {
        public static void main(String[] args) {
            try {
                ServerSocket serverSocket=new ServerSocket(8888,10);
                System.out.println("server is start");
                while( ! Thread.interrupted())
                {
                    new Thread(new Handler(serverSocket.accept())).start();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        static class Handler implements  Runnable
        {
            final Socket socket;
    
            final static int MAX_SIZE=1024;
    
            public Handler(Socket socket)
            {
                this.socket=socket;
            }
    
            @Override
            public void run() {
                //TODO
                //在这里对socket中的数据进行读取和处理,然后把结果写入到socket中去
            }
    
        }
    }

    高性能的IO目标和Reactor

    1:高负载下可以稳定的工作

    2:提高资源的利用率

    3:低延迟

    这有就有了分而治之和事件驱动的思想。这样没有thread就不用瞎跑了,cpu就不用不停的切换Thread . 这样提出了Reactor模式

    1:Reactor接收IO事件发送给该事件的处理器处理

    2:处理器的操作是非阻塞的。

    3:管理事件和处理器的绑定。

    这是一个单线程版本,所有的请求都是一个线程处理,当然Reactor不是无缘无故的提出来的,因为jdk提供了nio包,nio使得Reator得于实现

    1:channels: 通道就是数据源的抽象,通过它可以无阻塞的读取数据

    2:buffers  : 用来装载数据的,可以把从channel读取到buffer中,或者把buffer中的数据写入到channel中

    3:selector : 用来监听 有IO事件,并告诉channel

    4:selectionkeys:  IO事件和处理器绑定

    /**
     * Created by gaoxing on 2015-01-20 .
     */
    public class Reactor implements Runnable {
        final Selector selector ;
        final ServerSocketChannel serverSocketChannel;
    
        public Reactor(int port) throws IOException {
            this.selector=Selector.open();
            this.serverSocketChannel=ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            //一定是非阻塞的,阻塞的一个通道就只能处理一个请求了
            serverSocketChannel.configureBlocking(false);
            //把OP_ACCEPT事件和Acceptor绑定
            SelectionKey sk=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);
            sk.attach(new Acceptor());
    
        }
    
        @Override
        public void run() {
            while(!Thread.interrupted())
            {
                try {
                    selector.select();//该方法是阻塞的,只有IO事件来了才向下执行
                    Set<SelectionKey> selected=selector.selectedKeys();
                    Iterator<SelectionKey> it=selected.iterator();
                    while(it.hasNext())
                    {
                        dispatch(it.next());
                    }
              selected.clear()           }
    catch (IOException e) { e.printStackTrace(); } } } private void dispatch(SelectionKey next) { Runnable runnable= (Runnable) next.attachment(); if(runnable!=null) { runnable.run(); } } private class Acceptor implements Runnable{ @Override public void run() { SocketChannel channel= null; try { channel = serverSocketChannel.accept(); if (channel!=null){ new Handler(selector,channel); } } catch (IOException e) { e.printStackTrace(); } } } } class Handler implements Runnable { final SelectionKey sk; final SocketChannel channel; final static int MAXSIZE=1024; ByteBuffer input=ByteBuffer.allocate(MAXSIZE); ByteBuffer output=ByteBuffer.allocate(MAXSIZE); static final int READING=0,SENDING=1; int state=READING; public Handler(Selector selector,SocketChannel channel) throws IOException { this.channel=channel; this.channel.configureBlocking(false); /** * 这个有个问题,ppt上register是0, */ sk=this.channel.register(selector,SelectionKey.OP_READ); sk.attach(this); /** * 这里的作用是,设置key的状态为可读,然后让后selector的selector返回。 * 然后就可以处理OP_READ事件了 */ sk.interestOps(SelectionKey.OP_READ); selector.wakeup(); } @Override public void run() { if (state==READING) read(); if (state==SENDING) write(); } void read(){ state=SENDING; sk.interestOps(SelectionKey.OP_WRITE); } void write() { sk.cancel(); } }

     上面代码注解理解有误:

    sk=this.channel.register(selector,0);  这里是初始化一个SelectionKey 不监听事件
    sk.interestOps(SelectionKey.OP_READ); 这里设置监听的事件为OP_READ

    多线程的Reactor模式

    现在的CPU多核的,为了提高对硬件的使用效率需要考虑使用多线程的Reactor设计模式,Reactor主要用来处罚事件的,但是事件的处理会降低Reactor的性能,考虑把事件的处理放到别的线程上来做,有点想android的设计模式,UI线程用来接收用户的事件,事件的处理放到线程去做来提高用户的体验。多线程Reactor有两种一种是Reactor线程只关注io事件,事件处理放到别的线程,一种对事件分类主Reactor只关注Accept事件,子Reactor关注read和write事件。事件处理放到线程去做,这也是netty的设计模式。

    class HandlerPool implements  Runnable
    {
        final SelectionKey sk;
        final SocketChannel channel;
        final static int MAXSIZE=1024;
        ByteBuffer input=ByteBuffer.allocate(MAXSIZE);
        ByteBuffer output=ByteBuffer.allocate(MAXSIZE);
        static ExecutorService executor = Executors.newFixedThreadPool(100);
        static final int READING=0,SENDING=1;
        int state=READING;
    
        public HandlerPool(Selector selector,SocketChannel channel) throws IOException {
            this.channel=channel;
    
            this.channel.configureBlocking(false);
            /**
             * 这个有个问题,这里注册的SelectionKey是不处理的,应该他监听的事件为0
             */
            sk=this.channel.register(selector,0);
            sk.attach(this);
            /**
             * 这里的作用是,SelectionKey的监听事件为OP_READ。interestOps对哪个事件感兴趣
             */
            sk.interestOps(SelectionKey.OP_READ);
            selector.wakeup();
    
        }
        @Override
        public void run() {
            executor.execute(new Processer());
        }
    
        class Processer implements Runnable
        {
    
            @Override
            public void run() {
    
            }
        }
    }

    这个就在Acceptor里面多实例化几个Selector,它处理Read和Write事件。

    大致架构弄懂了。后面边看netty源码,边学习nio

  • 相关阅读:
    js常用代码整理
    java 序列化时排除指定属性
    FastJson bean序列化属性顺序问题
    用logger在控制台打印信息
    UNITY 内存问题资料收集
    数组指针和指针数组的区别
    inl文件介绍
    C++防止文件重复包含
    VS2017 Intelligense C++ 设置的几个重点
    GPU架构图
  • 原文地址:https://www.cnblogs.com/gaoxing/p/4237789.html
Copyright © 2011-2022 走看看