zoukankan      html  css  js  c++  java
  • 网络编程 -- RPC实现原理 -- NIO多线程 -- 迭代版本V2

     网络编程 -- RPC实现原理 -- 目录

      啦啦啦

    V2——增加WriteQueue队列,存放selectionKey。addWriteEventToQueue()添加selectionKey并唤醒阻塞的selector。等selector唤醒之后再注册OP_WRITE事件。

    selectionKey.cancel();清除key对应事件之后,由于多线程 main线程和对应的IO线程会抢夺selector资源。

    在selector.select()和sc.register(selectionKey.selector(), SelectionKey.OP_WRITE);处会发生死锁,因此将需要注册IO操作的selectionKey放入队列并唤醒selector,之后在注册IO操作。

    NIO 多线程

    1.打开Selector
    2.打开ServerSocketChannel
    3.获取与此Channel关联的ServerSocket并绑定地址
    4.设置Channel为非阻塞
    5.将Channel注册到Selector并指定操作位

    6.阻塞select()返回具有操作的通道个数。如果唤醒之后就绪通道数小于1,则将WriteQueue、ReadQueue队列中的所有sc注册到selector中并分别设置写操作位、读操作位。
    7.获取SelectionKeys遍历,最后移除
    8.根据key判断操作:isAcceptable、isConnectable、isReadable、isWritable
    9.1.isAcceptable:根据key获取Channel,accept()获取SocketChannel并设置为非阻塞模式,注册到Selector并指定读操作位和缓冲区。
    9.2.isConnectable:仅仅就是连接状态
    9.3.isReadable:取消Read事件,启动Read线程处理IO读操作。添加selectionKey到WriteQueue中并唤醒selector。
    9.4.isWritable:取消Write事件,启动Write线程处理IO写操作。添加selectionKey到ReadQueue中并唤醒selector。

        private static Queue<SelectionKey> writeQueue = new LinkedBlockingQueue<SelectionKey>();
        public static void addWriteEventToQueue(SelectionKey selectionKey) {
            System.out.println(Thread.currentThread() + " -- -- -- 添加selectionKey到队列,并唤醒selector");
            writeQueue.add(selectionKey);
            selectionKey.selector().wakeup();
        }
           int n = selector.select();
                System.out.println(Thread.currentThread() + " 事件就绪通道个数 : " + n);
                if(n < 1){
                    System.out.println(Thread.currentThread() + " -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作");
                    Iterator<SelectionKey> iterator = writeQueue.iterator();
                    while(iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();
                        SocketChannel sc = (SocketChannel) selectionKey.channel();
                        sc.register(selectionKey.selector(), SelectionKey.OP_WRITE);
                    }
                }

      Class : Service

    package lime.pri.limeNio.optimize.socket4;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Queue;
    import java.util.Set;
    import java.util.concurrent.LinkedBlockingQueue;
    
    public class Service {
    
        public static void main(String[] args) throws IOException, InterruptedException {
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new InetSocketAddress(9999));
            serverSocketChannel.configureBlocking(false);
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                System.out.println(Thread.currentThread() + " 监听端口@9999,等待客户端连接...");
                int n = selector.select();
                System.out.println(Thread.currentThread() + " 事件就绪通道个数 : " + n);
                if(n < 1){
                    System.out.println(Thread.currentThread() + " -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作");
                    Iterator<SelectionKey> iterator = writeQueue.iterator();
                    while(iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();
                        SocketChannel sc = (SocketChannel) selectionKey.channel();
                        sc.register(selectionKey.selector(), SelectionKey.OP_WRITE);
                    }
                }
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectedKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey selectionKey = iterator.next();
                    iterator.remove();
                    if (selectionKey.isValid() && selectionKey.isAcceptable()) {
                        System.out.println(Thread.currentThread() + " -- -- -- 处理Acceptable事件");
                        ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel sc = ssc.accept();
                        sc.configureBlocking(false);
                        sc.register(selector, SelectionKey.OP_READ);
                    }
                    if (selectionKey.isValid() && selectionKey.isReadable()) {
                        System.out.println(Thread.currentThread() + " -- -- -- 处理Readable事件");
                        selectionKey.cancel();
                        RequestProcessor.proceess(selectionKey, selector);
                    }
                    if (selectionKey.isValid() && selectionKey.isWritable()) {
                        System.out.println(Thread.currentThread() + " -- -- -- 处理Writable事件");
                        selectionKey.cancel();
                        ResponseProcessor.proceess(selectionKey, selector);
                    }
                }
            }
        }
    
        private static Queue<SelectionKey> writeQueue = new LinkedBlockingQueue<SelectionKey>();
        public static void addWriteEventToQueue(SelectionKey selectionKey) {
            System.out.println(Thread.currentThread() + " -- -- -- 添加selectionKey到队列,并唤醒selector");
            writeQueue.add(selectionKey);
            selectionKey.selector().wakeup();
        }
    }

      Class : RequestProcessor

    package lime.pri.limeNio.optimize.socket4;
    
    import java.io.ByteArrayOutputStream;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class RequestProcessor {
    
        private static ExecutorService exec = Executors.newFixedThreadPool(2);
    
        public static void proceess(final SelectionKey selectionKey, final Selector selector) {
            exec.submit(new Runnable() {
    
                public void run() {
                    try {
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        byteBuffer.clear();
                        SocketChannel sc = (SocketChannel) selectionKey.channel();
                        ByteArrayOutputStream bos = new ByteArrayOutputStream();
                        while(sc.read(byteBuffer) != -1){
                            bos.write(byteBuffer.array());
                            byteBuffer.clear();
                        }
                        System.out.println(Thread.currentThread() + " 客户端( " + sc.getRemoteAddress() + " ) 请求 : " + bos.toString());
                        Service.addWriteEventToQueue(selectionKey);
                    } catch (Exception e) {
                    }
                }
            });
        }
    
    }

      Class : ResponseProcessor

    package lime.pri.limeNio.optimize.socket4;
    
    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Date;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ResponseProcessor {
    
        private static ExecutorService exec = Executors.newFixedThreadPool(2);
    
        public static void proceess(final SelectionKey selectionKey, final Selector selector) {
            exec.submit(new Runnable() {
    
                public void run() {
                    try {
                        String response = "服务端响应 : " + new Date().toString();
                        ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBytes());
                        SocketChannel sc = (SocketChannel) selectionKey.channel();
                        sc.write(byteBuffer);
                        sc.close();
                    } catch (IOException e) {
                    }
                }
            });
        }
    
    }

      Class : Client

    package lime.pri.limeNio.optimize.socket4;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    
    public class Client {
    
        public static void main(String[] args) throws IOException {
            for (int i = 0; i < 10; i++) {
                new Thread() {
                    @Override
                    public void run() {
                        try {
                            SocketChannel socketChannel = SocketChannel.open();
                            socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999));
    
                            socketChannel.write(ByteBuffer.wrap("Query Date".getBytes()));
                            socketChannel.shutdownOutput();
    
                            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                            socketChannel.read(byteBuffer);
                            System.out.println(new String(byteBuffer.array()));
                            socketChannel.close();
                        } catch (IOException e) {
                        }
                    }
    
                }.start();
            }
        }
    }

      Console: Server

    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 1
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 1
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 3
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 2
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 2
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[pool-1-thread-2,5,main] 客户端( /127.0.0.1:4643 ) 请求 : Query Date
    Thread[pool-1-thread-2,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 2
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 2
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 2
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[pool-1-thread-2,5,main] 客户端( /127.0.0.1:4645 ) 请求 : Query Date
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[pool-1-thread-1,5,main] 客户端( /127.0.0.1:4644 ) 请求 : Query Date
    Thread[pool-1-thread-1,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[pool-1-thread-2,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 0
    Thread[pool-1-thread-2,5,main] 客户端( /127.0.0.1:4646 ) 请求 : Query Date
    Thread[main,5,main] -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作
    Thread[pool-1-thread-2,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[pool-1-thread-1,5,main] 客户端( /127.0.0.1:4642 ) 请求 : Query Date
    Thread[pool-1-thread-1,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[pool-1-thread-2,5,main] 客户端( /127.0.0.1:4647 ) 请求 : Query Date
    Thread[pool-1-thread-2,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[pool-1-thread-1,5,main] 客户端( /127.0.0.1:4648 ) 请求 : Query Date
    Thread[main,5,main] 事件就绪通道个数 : 0
    Thread[main,5,main] -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作
    Thread[pool-1-thread-1,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 0
    Thread[main,5,main] -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 9
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[pool-1-thread-2,5,main] 客户端( /127.0.0.1:4649 ) 请求 : Query Date
    Thread[pool-1-thread-2,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 0
    Thread[main,5,main] -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 10
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Acceptable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 1
    Thread[main,5,main] -- -- -- 处理Readable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[pool-1-thread-1,5,main] 客户端( /127.0.0.1:4650 ) 请求 : Query Date
    Thread[pool-1-thread-1,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[pool-1-thread-2,5,main] 客户端( /127.0.0.1:4651 ) 请求 : Query Date
    Thread[main,5,main] 事件就绪通道个数 : 0
    Thread[main,5,main] -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[pool-1-thread-2,5,main] -- -- -- 添加selectionKey到队列,并唤醒selector
    Thread[main,5,main] 事件就绪通道个数 : 9
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 0
    Thread[main,5,main] -- -- -- selector被唤醒,注册队列中所有socketChannel为OP_WRITE操作
    Thread[main,5,main] 监听端口@9999,等待客户端连接...
    Thread[main,5,main] 事件就绪通道个数 : 10
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] -- -- -- 处理Writable事件
    Thread[main,5,main] 监听端口@9999,等待客户端连接...

      Class : Client

    Thread[Thread-7,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-0,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-3,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-9,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-1,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-4,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-6,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-5,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-8,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017
    Thread[Thread-2,5,main] 服务端响应 : Sat Jun 24 16:00:41 CST 2017

    啦啦啦

  • 相关阅读:
    当数据库结构改变时,需要将数据库删除再创建
    命名空间“System.Web.Mvc”中不存在类型或命名空间“Ajax”(是否缺少程序集引用?)
    jqGrid 各种参数 详解
    二维数组最小路径和
    动态规划:最大连续子序列和
    最长递增子序列
    java单例模式的几种实现
    java多线程的实现方法
    sleep与wait的区别
    数组旋转
  • 原文地址:https://www.cnblogs.com/ClassNotFoundException/p/7073804.html
Copyright © 2011-2022 走看看