zoukankan      html  css  js  c++  java
  • 第一章 java nio三大组件与使用姿势

    本案例来源于《netty权威指南》

    一、三大组件

    • Selector:多路复用器。轮询注册在其上的Channel,当发现某个或者多个Channel处于“就绪状态”后(accept接收连接事件、connect连接完成事件、read读事件、write写事件),从阻塞状态返回就绪的Channel的SelectionKey集合,之后进行IO操作。
    • Channel:封装了socket。
      • ServerSocketChannel:封装了ServerSocket,用于accept客户端连接请求;
      • SocketChannel:一对SocketChannel组成一条虚电路,进行读写通信
    • Buffer:用于存取数据,最主要的是ByteBuffer
      • position:下一个将被操作的字节位置
      • limit:在写模式下表示可以进行写的字节数,在读模式下表示可以进行读的字节数
      • capacity:Buffer的大小

    二、服务端代码

    1、服务端启动类

    1 public class Server {
    2     public static void main(String[] args) throws IOException {
    3         new Thread(new ServerHandler(8080), "server-1").start();
    4     }
    5 }

    创建一个任务ServerHandler,然后创建一条线程,启动执行该任务。

    2、逻辑处理类

     1 public class ServerHandler implements Runnable {
     2     private Selector selector;
     3     private ServerSocketChannel ssChannel;
     4 
     5     public ServerHandler(int port) {
     6         try {
     7             //等价于 Selector selector = SelectorProvider.provider().openSelector();
     8             selector = Selector.open();
     9             //等价于 SelectorProvider.provider().openServerSocketChannel()
    10             ssChannel = ServerSocketChannel.open();
    11             ssChannel.configureBlocking(false);
    12             ssChannel.bind(new InetSocketAddress(port), 1024);
    13             ssChannel.register(selector, SelectionKey.OP_ACCEPT);
    14         } catch (IOException e) {
    15             e.printStackTrace();
    16             System.exit(1);
    17         }
    18     }
    19 
    20     public void run() {
    21         for (; ; ) {
    22             try {
    23                 selector.select();
    24                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
    25                 while (it.hasNext()) {
    26                     SelectionKey key = it.next();
    27                     it.remove();
    28                     handleInput(key);
    29                 }
    30             } catch (Throwable t) {
    31                 t.printStackTrace();
    32             }
    33         }
    34 
    35     }
    36 
    37     private void handleInput(SelectionKey key) throws IOException {
    38         if (key.isValid()) {
    39             // 处理新接入的请求消息
    40             if (key.isAcceptable()) {
    41                 // Accept the new connection
    42                 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
    43                 SocketChannel sc = ssc.accept();
    44                 sc.configureBlocking(false);
    45                 // Add the new connection to the selector
    46                 sc.register(selector, SelectionKey.OP_READ);
    47             }
    48             if (key.isReadable()) {
    49                 // Read the data
    50                 SocketChannel sc = (SocketChannel) key.channel();
    51                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    52                 int readBytes = sc.read(readBuffer);
    53                 if (readBytes > 0) {
    54                     readBuffer.flip();
    55                     byte[] bytes = new byte[readBuffer.remaining()];
    56                     readBuffer.get(bytes);
    57                     String body = new String(bytes, "UTF-8");
    58                     System.out.println("The time server receive order : "
    59                             + body);
    60                     String currentTime = "QUERY TIME ORDER"
    61                             .equalsIgnoreCase(body) ? new java.util.Date(
    62                             System.currentTimeMillis()).toString()
    63                             : "BAD ORDER";
    64                     doWrite(sc, currentTime);
    65                 } else if (readBytes < 0) {
    66                     // 对端链路关闭
    67                     key.cancel();
    68                     sc.close();
    69                 } else
    70                     ; // 读到0字节,忽略
    71             }
    72         }
    73     }
    74 
    75     private void doWrite(SocketChannel channel, String response)
    76             throws IOException {
    77         if (response != null && response.trim().length() > 0) {
    78             byte[] bytes = response.getBytes();
    79             ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
    80             writeBuffer.put(bytes);
    81             writeBuffer.flip();
    82             channel.write(writeBuffer);
    83         }
    84     }
    85 }

    步骤:

    1、创建一个Selector和ServerSocketChannel实例

    2、配置ServerSocketChannel实例为非阻塞

    3、ServerSocketChannel实例bind端口

    4、将ServerSocketChannel实例注册到selector上,监听OP_ACCEPT事件

    下面的任务在Server创建的新的线程中执行,不影响主线程执行其他逻辑

    5、之后进入死循环

    5.1、使用select.select()阻塞等待就绪事件(这里是等待OP_ACCEPT事件),一旦有有就绪事件到达,立即向下执行

    5.2、使用selector.selectedKeys()获取已经就绪的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之后循环遍历

    5.3、从迭代器删除该SelectionKey,防止下一次再被遍历到

    5.4、如果SelectionKey==OP_ACCEPT,则通过ServerSocketChannel.accept()创建SocketChannel,该SocketChannel是后续真正的与客户端的SocketChannel进行通信的实体

    5.5、配置新创建的SocketChannel实例为非阻塞,然后将该SocketChannel实例注册到selector实例上,监听OP_READ事件

    5.6、等客户端发出请求数据时,此处监听到SelectionKey==OP_READ,则创建ByteBuffer实例,将SocketChannel中的数据读取到ByteBuffer中,然后再创建ByteBuffer将信息写回到SocketChannel(也就是说数据的读写一定要通过Buffer)

    三、客户端代码

    1、客户端启动类

    1 public class Client {
    2     public static void main(String[] args) {
    3         new Thread(new ClientHandler("127.0.0.1", 8080), "client-1").start();
    4     }
    5 }

    创建一个任务ClientHandler,然后创建一条线程,启动执行该任务。

    2、逻辑处理类

     1 public class ClientHandler implements Runnable {
     2     private String host;
     3     private int port;
     4 
     5     private Selector selector;
     6     private SocketChannel socketChannel;
     7 
     8     public ClientHandler(String host, int port) {
     9         this.host = host;
    10         this.port = port;
    11         try {
    12             selector = Selector.open();
    13             socketChannel = SocketChannel.open();
    14             socketChannel.configureBlocking(false);
    15         } catch (IOException e) {
    16             e.printStackTrace();
    17             System.exit(1);
    18         }
    19     }
    20 
    21     public void run() {
    22         try {
    23             doConnect();
    24         } catch (IOException e) {
    25             e.printStackTrace();
    26             System.exit(1);
    27         }
    28         while (true) {
    29             try {
    30                 selector.select();
    31                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
    32                 while (it.hasNext()) {
    33                     SelectionKey key = it.next();
    34                     it.remove();
    35                     handleInput(key);
    36                 }
    37             } catch (Exception e) {
    38                 e.printStackTrace();
    39                 System.exit(1);
    40             }
    41         }
    42     }
    43 
    44     private void handleInput(SelectionKey key) throws IOException {
    45         if (key.isValid()) {
    46             // 判断是否连接成功
    47             SocketChannel sc = (SocketChannel) key.channel();
    48             if (key.isConnectable()) {
    49                 if (sc.finishConnect()) {
    50                     sc.register(selector, SelectionKey.OP_READ);
    51                     doWrite(sc);
    52                 } else
    53                     System.exit(1);// 连接失败,进程退出
    54             } else if (key.isReadable()) {
    55                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    56                 int readBytes = sc.read(readBuffer);
    57                 if (readBytes > 0) {
    58                     readBuffer.flip();
    59                     byte[] bytes = new byte[readBuffer.remaining()];
    60                     readBuffer.get(bytes);
    61                     String body = new String(bytes, "UTF-8");
    62                     System.out.println("Now is : " + body);
    63                 } else if (readBytes < 0) {
    64                     // 对端链路关闭
    65                     key.cancel();
    66                     sc.close();
    67                 } else
    68                     ; // 读到0字节,忽略
    69             }
    70         }
    71 
    72     }
    73 
    74     private void doConnect() throws IOException {
    75         // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
    76         if (socketChannel.connect(new InetSocketAddress(host, port))) {
    77             socketChannel.register(selector, SelectionKey.OP_READ);
    78             doWrite(socketChannel);
    79         } else
    80             socketChannel.register(selector, SelectionKey.OP_CONNECT);
    81     }
    82 
    83     private void doWrite(SocketChannel sc) throws IOException {
    84         byte[] req = "QUERY TIME ORDER".getBytes();
    85         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
    86         writeBuffer.put(req);
    87         writeBuffer.flip();
    88         sc.write(writeBuffer);
    89         if (!writeBuffer.hasRemaining())
    90             System.out.println("Send order 2 server succeed.");
    91     }
    92 }

    步骤:

    1、创建一个Selector和SocketChannel实例

    2、配置SocketChannel实例为非阻塞

    下面的任务在Cilent创建的新的线程中执行,不影响主线程执行其他逻辑

    3、SocketChannel.connect连接到server端,如果连接没有马上成功,将该SocketChannel实例注册到selector上,监听OP_CONNECT事件;如果连接成功,将该SocketChannel实例注册到selector上,监听OP_READ事件,之后写数据给server端

    4、之后进入死循环

    4.1、使用select.select()阻塞等待就绪事件,一旦有有就绪事件到达,立即向下执行

    4.2、使用selector.selectedKeys()获取已经就绪的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之后循环遍历

    4.3、从迭代器删除该SelectionKey,防止下一次再被遍历到

    4.4、如果SelectionKey==OP_CONNECT,将该SocketChannel实例注册到selector上,监听OP_READ事件,之后写数据给server端;如果监听到SelectionKey==OP_READ,则创建ByteBuffer实例,将SocketChannel中的数据读取到ByteBuffer中

  • 相关阅读:
    设计模式之-工厂方法模式
    设计模式之-简单工厂模式
    设计模式之-单例模式
    Ubuntu-18.04 下使用Nginx搭建高可用,高并发的asp.net core集群
    Ubuntu-18.04 下修改root用户密码,安装SSH服务,允许root用户远程登录,安装vsftp服务器
    ASP.NET Core 系列[1]:ASP.NET Core 初识
    .net core系列之《将.net core应用部署到Ubuntu》
    动态内存分配函数
    C++ sort()对结构体排序
    STL
  • 原文地址:https://www.cnblogs.com/java-zhao/p/9195943.html
Copyright © 2011-2022 走看看