zoukankan      html  css  js  c++  java
  • 一个 Reactor 模型的实现

      Reactor 模型不再介绍,网上有很多。在实现的过程中有一些收获:

      1. Reactor 广义上说,事件发生时便会触发为事件注册的处理函数。在注册感兴趣的事件时,需要将处理函数一并绑定到事件上,在 NIO 中我们通过 SelectionKey 的 attachment 携带函数对象。

      2. 使用 NIO ,应该摒弃 BIO 编程时的阻塞思维。如:

                while(writeBuffer.remaining()>0){
                    socketChannel.write(writeBuffer);
                }

      这种繁忙等待的写法,放在以 NIO 为内核的代码中会比较另类,白白占用了 CPU 资源。

      完全可以依赖其事件通知机制,在写事件未就绪时直接返回让线程做其它事,等待写事件到来再触发写入数据的方法。

      当然这种处理方式需要我们自己判断是否写入已完成,因为一次写入可能需要多次写事件。

      并且一旦写事件完成,我们必须取消 selector 对该 channel 写事件的关注。因为写事件就绪是指 socket 在内核空间的输出缓冲区还有剩余空间可写,如果不取消关注,即使没有数据需要发送,只要该 socket 的内核输出缓冲区还有空间,就会不断的触发写事件。这可能浪费大量 CPU 资源。

      这样,在发送数据量较大且网速较慢时,CPU 不会将时间浪费在等待 socket 输出缓冲区有空余 上。充分发挥 NIO 的性能优势。

      3. 使用 TCP 协议进行数据传输需要我们自己实现报文的定界,避免 分包 与 粘包 的问题。在 NIO 的处理中,我们可以将多读的或者需要补齐的数据放在 SelectionKey 的 attachment 中,等待该 key 下次被激活再接着处理。

      另外牵扯四次挥手,如果我们使用短连接,服务端响应后直接 close 掉连接,客户端接收响应报文时不需要定界。因为服务端发送了关闭请求,客户端在读完接收缓冲区数据后 read 会返回 -1。否则 read 可能返回 0 而不是 -1,也就是说长连接下不能通过 read 的返回值进行定界。

      4. 本次实现采用报文头两个字节为 16 进制报文长度,靠该长度进行定界的策略。也可以采用其他策略定界,可以在凑齐一条完整报文后将报文直接转交给业务处理函数处理。也可以将凑齐的报文保存在有序队列中,采用其他线程顺序的处理这些报文,将报文的接收与处理异步化。

      5. 本类实现仅用于学习,生产时应采用 Netty 等成熟框架,避免自己直接开发,除非项目中有成员深谙此领域的弯弯绕绕。

      下面是实现:

    /**
     * @Author Niuxy
     * @Date 2020/5/31 8:15 下午
     * @Description 最基础的 reactor 模型
     * 本模型不负责 serverSocket 与 selector 的初始化
     * 入口处的并发需要通过负载策略将 socketChannel 的事件注册到不同的 selector 上,待验证
     * selector 底层使用的是 OS 提供给我们的事件通知器
     */
    public class BasicReator implements Runnable {
        ServerSocketChannel serverSocketChannel;
        Selector selector;
    
        public BasicReator(ServerSocketChannel serverSocketChannel, Selector selector)
                throws ClosedChannelException {
            this.serverSocketChannel = serverSocketChannel;
            this.selector = selector;
            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            key.attach(new Acceptor());
        }
    
        @Override
        public void run() {
            try {
                while (!Thread.currentThread().isInterrupted()) {
                    selector.select();
                    Set keySet = selector.selectedKeys();
                    Iterator iterator = keySet.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = (SelectionKey) iterator.next();
                        iterator.remove();
                        dispatch(key);
                    }
                }
            } catch (RuntimeException re) {
                // 接收策略层面没有正确解析报文会抛出 RuntimeException,为了测试,遇到该异常立即终止
                throw re;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * @Author Niuxy
         * @Date 2020/5/31 7:23 下午
         * @Description 核心的部分
         * 无论什么事件被触发,我们均执行与事件绑定的方法
         * 在注册感兴趣的事件时,就将处理该事同步件的方法绑定到了对应的 selectionKey 中
         */
        private void dispatch(SelectionKey key) {
            Runnable att = (Runnable) key.attachment();
            try {
                if (att == null) {
                    return;
                }
                att.run();
            } catch (RuntimeException re) {
                throw re;
            } catch (Exception e) {
                //dispatch 层面面向唯一连接,具有处理异常的能力,捕获处理不影响其它连接
                //TO DO
                e.printStackTrace();
            }
        }
    
        /**
         * @Author Niuxy
         * @Date 2020/5/31 7:41 下午
         * @Description 处理连接请求,注册读写事件并绑定读写事件处理器
         */
        class Acceptor implements Runnable {
            @Override
            public void run() {
                try {
                    SocketChannel socketChannel = serverSocketChannel.accept();
                    if (socketChannel == null) {
                        return;
                    }
                    socketChannel.configureBlocking(false);
                    SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ);
                    //报文接收策略,与连接一一绑定
                    MessageHandler messageHandler = new PrintMessageHandlerImpl();
                    ReciveRegister reciveRegister = new HLRegisterImpl(2, messageHandler);
                    //注册 key 的同时 将事件处理的 "回调" 函数绑定到 key 上
                    key.attach(new Handler(socketChannel, key, reciveRegister));
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * @Author Niuxy
         * @Date 2020/5/31 11:44 下午
         * @Description 读写处理器
         */
        class Handler implements Runnable {
            public static final int READING = 0, WRITING = 1;
            SocketChannel socketChannel;
            SelectionKey key;
            /**
             * @Author Niuxy
             * @Date 2020/6/2 9:29 下午
             * @Description
             * 在响应上一个请求前,我们不希望处理下一个请求,因此在 Handler 维护一个状态位,标识目前应当
             * 处理读事件还是写事件
             * 我们必须保证接收和回复的顺序性,保证客户端可以对响应做出正确的处理
             * 当然也有其它的处理方式,我们将响应数据装入一个有序队列,并顺序的处理这些响应。或者通过令牌将请求和响应
             * 进行对应。
             */
            int state = READING;
            ReciveRegister reciveRegister;
            String readResult = null;
            ByteBuffer writeBuffer = null;
    
            Handler(SocketChannel channel, SelectionKey key, ReciveRegister reciveRegister) {
                this.socketChannel = channel;
                this.key = key;
                this.reciveRegister = reciveRegister;
            }
    
            @Override
            public void run() {
                try {
                    if (state == READING) {
                        read();
                    } else {
                        write();
                    }
                    //TO DO :write() method
                } catch (RuntimeException rex) {
                    throw rex;
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
            private void read() throws Exception {
                String re = reciveRegister.doRecive(socketChannel);
                if (re != null && re != "") {
                    readResult = re;
                    state = WRITING;
                    key.interestOps(SelectionKey.OP_WRITE);
                }
            }
    
            private void write() throws IOException {
                if (this.readResult == null || readResult == "") {
                    return;
                }
                //如果不是第一次触发写事件,接着上次的写
                if (writeBuffer == null) {
                    writeBuffer = ByteBuffer.wrap(this.readResult.getBytes());
                }
    
                //该循环处理发送缓冲区处理速度小于网卡发送速度,无法一次性将 buffer 中数据写入发送缓冲区的情况
                socketChannel.write(writeBuffer);
                if (writeBuffer.position() != writeBuffer.limit()) {
                    return;
                }
                writeBuffer = null;
                readResult = null;
                state = READING;
                //写完将兴趣移除,否则会将 CPU 跑满
           key.interestOps(SelectionKey.OP_READ); } } }

      定界策略与服务分发通过策略模式进行了解耦,定界策略的实现:

    public interface ReciveRegister {
        public String doRecive(SocketChannel socketChannel) throws Exception;
    }
    /**
     * @Author Niuxy
     * @Date 2020/5/28 8:36 下午
     * @Description 报文头标示报文长度的定界策略,由于存在类成员缓冲区的原因需要与连接一一绑定
     */
    public class HLRegisterImpl implements ReciveRegister {
        //报文头长度
        private int headLength = 0;
        //报文内容长度
        private int messageLength = 0;
    
        private MessageHandler messageHandler;
    
        boolean isCache = false;
        ByteBuffer headCacheBuffer;
        ByteBuffer messageCacheBuffer;
    
        public HLRegisterImpl(int headLength, MessageHandler messageHandler) {
            this.messageHandler = messageHandler;
            this.headLength = headLength;
            headCacheBuffer = ByteBuffer.allocate(headLength);
        }
    
        @Override
        public String doRecive(SocketChannel socketChannel) throws Exception {
            //判断是否已读取报文头
            if (messageLength == 0) {
                int readLen = socketChannel.read(headCacheBuffer);
                if (Util.isFullBuffer(headCacheBuffer)) {
                    headCacheBuffer.flip();
                    String messageLenthStr = Util.bytes2HexString(headCacheBuffer.array());
    //                messageLength = headCacheBuffer.getInt();
                    messageLength = Integer.parseInt(messageLenthStr, 16);
                    System.out.println(messageLength);
                    messageCacheBuffer = ByteBuffer.allocate(messageLength);
                    headCacheBuffer.clear();
                    return null;
                }
            } else {
                int readLen = socketChannel.read(messageCacheBuffer);
                String re = "";
                if (Util.isFullBuffer(messageCacheBuffer)) {
                    re = messageHandler.doHandler(socketChannel, messageCacheBuffer);
                    messageLength = 0;
                    headLength = 0;
                    messageCacheBuffer = null;
                    System.gc();
                    return re;
                }
                return null;
            }
            return null;
        }
    
    }

      报文处理:

    public interface MessageHandler {
        public String doHandler(SocketChannel socketChannel,ByteBuffer messageBuffer) throws Exception;
    }
    public class PrintMessageHandlerImpl implements MessageHandler {
        String target = "hellow server!hellow server!hellow server!hellow server!hellow server!hellow server!hellow server!hellowhellow server!hellow server!hellow server!hellow server!hellow server!hellow server!hellow";
    
        @Override
        public String doHandler(SocketChannel socketChannel, ByteBuffer messageBuffer) throws Exception {
            String message = new String(messageBuffer.array());
            String re = "";
            if (!target.equals(message)) {
                System.out.println("error!: " + message);
                re="error&/n/*";
            } else {
                System.out.println("success!");
                re="successsuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssucc" +
                        "esssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssuccesssucces"
                        + "successsuccesssuccesssuccesssuccesssuccesssuccesssuccessssuccesssuccesssuccesssuccesssuccesss" +
                        "uccesssuccesssuccesssuccesssuccesssuccesssuccess!&/n/*";
            }
            messageBuffer = null;
            return re;
        }
    }

      测试客户端,直接使用 BIO 编写:

    public class TestClient {
        public static void main(String[] args) throws Exception {
            final String msg = "hellow server!hellow server!hellow server!hellow server!hellow server!hellow server!hellow server!hellowhellow server!hellow server!hellow server!hellow server!hellow server!hellow server!hellow";
            Thread thread0 = new Thread(() -> {
                sendMsg(msg);
            });
            Thread thread1 = new Thread(() -> {
                sendMsg(msg);
            });
            Thread thread2 = new Thread(() -> {
                sendMsg(msg);
            });
            Thread thread3 = new Thread(() -> {
                sendMsg(msg);
            });
            thread0.start();
            thread1.start();
            thread2.start();
            thread3.start();
    
        }
    
        private static void sendMsg(String msg) {
            try {
                for (int i = 0; i < 100; i++) {
                    send(msg);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void send(String message) throws Exception {
            Socket socket = new Socket("127.0.0.1", 8000);
            byte[] messageBytes = message.getBytes();
            Integer length = messageBytes.length;
            System.out.println(length);
            for (int i0 = 0; i0 < 10; i0++) {
                OutputStream outputStream = socket.getOutputStream();
                String lenStr16 = Util.numToHex16(length);
                outputStream.write(Util.hexTobytes(lenStr16));
                Thread.sleep(100);
                outputStream.write(messageBytes);
                outputStream.flush();
                BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
                byte[] buffer = new byte[100];
                int len = 0;
                StringBuilder sb = new StringBuilder();
                while ((len = in.read(buffer)) != -1) {
                    //重要!为了复用 buffer,我们必须知道哪些是新写入的数据,哪些是已经处理过的数据
                    String data = new String(buffer, 0, len);
                    sb.append(data);
                    //必须有自己的定界方式。否则只要服务端不关闭连接,len 永远不会是 -1,必须用特定的方式标识某次读取已结束!
                    if (data != null && data.contains("&/n/*")) {
                        break;
                    }
                }
                System.out.println(sb.substring(0));
            }
        }
    
        private static void printArr(byte[] arr) {
            for (int i = 0; i < arr.length; i++) {
                System.out.print(arr[i]);
            }
        }
    
    }
  • 相关阅读:
    Vue-router 报NavigationDuplicated的可能解决方案
    go 数据类型转换
    在vscode 之中使用 GO MOD
    javascript格式化
    Mac node-sass 安装失败“v8::String::Utf8Value”
    Django 使用gunicorn 和 supervisord部署
    关于windows上的账号(权限)切换
    python中的global关键字
    暂时性的小总结
    windwos 安装下kafka的安装使用
  • 原文地址:https://www.cnblogs.com/niuyourou/p/13034696.html
Copyright © 2011-2022 走看看