zoukankan      html  css  js  c++  java
  • Reactor模式及NIO

    Reactor模式及NIO

    1.引子

    2.Reactor模式

    3.Reactor模式应用

    1.引子

    Redis服务器: Redis服务器是一个事件驱动程序,主要处理文件事件(file event)和时间事件(time event)

    Redis基于Reactor模式开发了网络事件处理器

    2.Reactor模式

    2.1Reactor模式定义

    The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers

    1.reactor模式是事件驱动模式
    2.并发服务处理一个或多个输入
    3.实现分离多个到达的请求,并同步分发给相关处理器
    

    2.2网络编程模型

    Reactor, Proactor, Asynchronous, Completion Token, and Acceptor-Connector

    基础的socket编程实现见附录

    2.3Reactor常用组件

    • Dispatcher,分发器,根据Event类型来调用EventHandler
    • Handle,句柄,操作系统管理的资源
    • EventHandler,事件处理器
    • Synchronous Event Demultiplexes,同步事件分离器

    • Reactor模式时序图

    • Reactor调用流程
    initiate => receive => demultiplex => dispatch => process events
    

    1.Dispatcher初始化

    2.注册EventHandler到Dispatcher中,每个EventHanler包含对响应Handle的引用

    3.启动EventLoop事件循环,调用select()方法,Synchronous Event Demultiplexes 阻塞等待 Event 发生

    4.当某个或某些Handle的Event发生后,select()方法返回,Dispatcher根据返回的Event找到注册的EventHandler

    并调用EventHandler的hand_event()方法

    5.EventHandler的hand_event()方法还可以向Diapatcher注册新的EventHandler,用于处理下一个Event

    • Reactor事件管理的接口
    class Reactor {
    public:
        	int register_handler(EventHandler *pHandler, int event);
    
        	int remove_handler(EventHandler *pHandler, int event);
    
        	void handle_events(timeval *ptv);
    }
    
    • EventHandler事件处理程序
    class Event_Handler {
    public:
        // events maybe read/write/timeout/close .etc
        virtual void handle_events(int events) = 0;
    
        virtual HANDLE get_handle() = 0;
    }
    

    3.Reactor模式应用

    3.1多线程IO模型

    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    
    /**
     * Classic ServerSocket Loop
     *
     * 传统的多线程IO,每个线程处理一个请求
     *
     * @Author deng shuo
     * @Date 5/26/21 23:09
     * @Version 1.0
     */
    public class ClassicServerModel implements Runnable{
    
        private static final int PORT = 8085;
        private static final int MAX_INPUT = 1024 *1024;
    
        @Override
        public void run() {
            try{
                ServerSocket ss = new ServerSocket(PORT);
                while(!Thread.interrupted()){
                    /*或者使用线程池*/
                    new Thread(new Handler(ss.accept())).start();;
                }
            }catch (IOException ex){
                /* Handler exception*/
                ex.printStackTrace();
            }
        }
    
        static class Handler implements Runnable{
            final Socket socket;
            Handler(Socket s){socket = s;}
    
            @Override
            public void run() {
                try{
                    byte[] input = new byte[MAX_INPUT];
                    socket.getInputStream().read(input);
                    
                    byte[] output = process(input);
                    socket.getOutputStream().write(output);
                }catch (IOException ex){
                    /* handler exception*/
                }
            }    
            private byte[] process(byte[] cmd){
                /* 处理输入数据  cmd*/
                
                byte[] output = new byte[MAX_INPUT];
                return  output;
            }
        }
    }
    
    • 优点: 极大提高服务器吞吐量,read阻塞后,不会影响后续的请求
    • 缺点: 资源要求高,线程的创建及销毁代价

    3.2单线程Reactor模型

    Divide-and-conquer is usually the best approach for achieving any scalability goal

    java.nio非阻塞IO对Reactor模式的实现

    NIO相关概念:

    • Channel (通道):Connections to files, sockets etc that support non-blocking reads ;
    • Buffers(缓冲区):Array-like objects that can be directly read or written by Channels
    • Selectors(选择器):Tell which of a set of Channels have IO events
    • SelectionKeys():Maintain IO event status and bindings

    Reactor Design

    package src.reactorNIO;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @Author deng shuo
     * @Date 5/27/21 21:24
     * @Version 1.0
     */
    public class NIOReactor implements Runnable {
        final Selector selector;
        final ServerSocketChannel serverSocketChannel;
    
        NIOReactor(int port) throws IOException {
            selector = Selector.open();
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
    
            // nonblocking
            serverSocketChannel.configureBlocking(false);
            // register 注册accept事件
            SelectionKey selectionKey = serverSocketChannel.register(selector,
                                                            SelectionKey.OP_ACCEPT);
            // 回调方法
            selectionKey.attach(new Acceptor());
    
        }
    
        @Override
        public void run() {
            try{
                while(!Thread.interrupted()){
                    selector.select();
                    Set<SelectionKey> selected = selector.selectedKeys();
                    Iterator iterator = selected.iterator();
                    while(iterator.hasNext()){
                        // dispatch分发事件
                        dispatch((SelectionKey)iterator.next());
                    }
                    selected.clear();
    
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        private void dispatch(SelectionKey selectionKey){
            //调用SelectionKey绑定的调用对象
            Runnable r =  (Runnable)(selectionKey.attachment());
            if(r != null){
                r.run();
            }
        }
    
        class Acceptor implements Runnable{
    
            @Override
            public void run() {
                try{
    
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if(socketChannel != null){
                        new NIOHandler(selector,socketChannel);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    

    Handler Design

    package src.reactorNIO;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    
    /**
     * @Author deng shuo
     * @Date 5/27/21 21:40
     * @Version 1.0
     */
    public final class NIOHandler implements Runnable {
    
        private static final int MAX_IN = 1024 *1024 ;
        private static final int MAX_OUT = 1024 *1024;
        final SocketChannel socketChannel;
        final SelectionKey selectionKey;
    
        ByteBuffer input = ByteBuffer.allocate(MAX_IN);
        ByteBuffer output = ByteBuffer.allocate(MAX_OUT);
        static final int READING = 0, SENDING = 1;
        int state = READING;
    
        NIOHandler(Selector s,SocketChannel c) throws IOException {
            socketChannel = c;
            socketChannel.configureBlocking(false);
            selectionKey = socketChannel.register(s,0);
            selectionKey.attach(this);
            selectionKey.interestOps(SelectionKey.OP_READ);
            s.wakeup();
        }
    
        @Override
        public void run() {
            try{
                if(state == READING){
                    read();
                }else if(state == SENDING){
                    send();
                }
            }catch (IOException ie){
                ie.printStackTrace();
            }
        }
    
        void read() throws IOException {
            socketChannel.read(input);
            if (inputIsComplete()) {
                process();
                state = SENDING;
                // Normally also do first write now
                selectionKey.interestOps(SelectionKey.OP_WRITE);
            } }
        void send() throws IOException {
            socketChannel.write(output);
            if (outputIsComplete())
                selectionKey.cancel();
        }
    
        boolean inputIsComplete() { /* ... */ return true;}
        boolean outputIsComplete() { /* ... */return true;}
        void process() { /* ... */ }
    }
    
    

    附录

    linux下socket实现

    • 套接字地址结构
    // IP地址
    struct in_addr {
    	in_addr_t s_addr;             /* Address in network byte order */
    };
    // IP socket address structure
    struct sockaddr_in {
    	__uint8_t       sin_len;
    	sa_family_t     sin_family;   /* Protocol family */
    	in_port_t       sin_port;     /* Port Number in network byte order */
    	struct  in_addr sin_addr;     /* IP Address in network byte order */
    	char            sin_zero[8];
    };
    // Genreic socket address structur(for connect bind accept)
    struct sockaddr {
    	__uint8_t       sa_len;         /* total length */
    	sa_family_t     sa_family;      /* [XSI] address family */
    	char            sa_data[14];    /* [XSI] addr value (actually larger) */
    };
    
    • server端实现
    #include<stdio.h>
    #include<string.h>
    #include<stdlib.h>
    #include<unistd.h>
    #include<arpa/inet.h>
    #include<sys/socket.h>
    #include<netinet/in.h>
    
    int main(){
    
        // 创建套接字
        // int socket(int af,int type,int protocol)
        // af(Address Family)[AF_INET,AF_INET6]
        // type(文件传输方式)[SOCK_STREAM,SOCK_DGRAM]
        // protocol(传输协议)[IPPROTO_TCP,IPPROTO_UDP]
        int serv_socket = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    
        // 套接字和IP,端口绑定
        // 套接字地址 = 地址+端口
        struct sockaddr_in serv_addr;
        memset(&serv_addr,0,sizeof(serv_addr));
    
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
        // uint16_t htons(uint_16_t hostshort)
        // IP地址结构存放的地址是(大端法)网络字节顺序存放(network byte order)
        // 主机字节顺序(host byte order)可能是小端法
        serv_addr.sin_port = htons(1234);
    
        // int bind(int, const struct sockaddr *, socklen_t)
        bind(serv_socket,(struct sockaddr *)&serv_addr,sizeof(serv_addr));
    
        // socket 创建默认是主动套接字
        // listen 将serv_socket从一个主动套接字转换为一个监听套接字
        // int listen(int sockfd,int backlog)
        listen(serv_socket,20);
    
        // 接受客户端请求
        struct sockaddr_in client_addr;
        socklen_t client_addr_size = sizeof(client_addr);
    
        int client_socket = accept(serv_socket,(struct sockaddr*)&client_addr,&client_addr_size);
    
        // 向客户端发送数据
        char str[] = "hello the world";
        write(client_socket,str,sizeof(str));
    
        // 关闭
        close(client_socket);
        close(serv_socket);
    
        return 0;
    }
    
    • client客户端实现
    #include<stdio.h>
    #include<string.h>
    #include<stdlib.h>
    #include<unistd.h>
    #include<arpa/inet.h>
    #include<sys/socket.h>
    #include<netinet/in.h>
    
    int main(){
    
        // 创建套接字
        int sock = socket(AF_INET,SOCK_STREAM,0);
        // 向服务器发起请求
        struct sockaddr_in serv_addr;
    
        memset(&serv_addr,0,sizeof(serv_addr));
    
        serv_addr.sin_family = AF_INET;
        serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
        serv_addr.sin_port = htons(1234);
    
        // int connect(int clientfd, const struct sockaddr *, socklen_t)
        // 和服务器建立连接
        connect(sock,(struct sockaddr*)&serv_addr,sizeof(serv_addr));
    
        // 读取服务器的值
        char buffer[40];
        read(sock,buffer,sizeof(buffer)-1);
    
        printf("message from server: %s
    ",buffer);
    
        // 关闭套接字
        close(sock);
    
        return 0;
    }
    
    //启动服务器等待
    dengshuodengshuo@dengMac � ~/Code/C++Projects/SocketTest � ./server
    
    // 启动客户端实现
    dengshuodengshuo@dengMac � ~/Code/C++Projects/SocketTest � ./client
    message from server: hello the world
    

    参考文章

    Reactor模式

    Scalable IO in Java- Doug Lea

    不要用狭隘的眼光看待不了解的事物,自己没有涉及到的领域不要急于否定. 每天学习一点,努力过好平凡的生活.
  • 相关阅读:
    服务器新环境搭建笔记
    mysql存储过程中的异常处理
    Maven为不同环境配置打包
    redis 绑定任意ip
    Redis 【Hash】 一句话说明
    Redis 【string】 一句话说明
    Redis 【keys】 一句话说明
    Redis命令参考【EXPIRE】
    Apache Maven 入门篇 ( 下 )
    Apache Maven 入门篇 ( 上 )
  • 原文地址:https://www.cnblogs.com/GeekDanny/p/14820135.html
Copyright © 2011-2022 走看看