zoukankan      html  css  js  c++  java
  • 说Java网络编程

    基础IO模型

    网络编程的目的在于远程发送数据,发送接收数据就涉及到I/O的操作,这里因为涉及到比较底层字节和字符的操作,所以不可以使用java.nio.file.Files 操作文件。那就先说说I/O吧,I/O流分为字节流和字符流。字节即Byte,包含8位二进制数,一个二进制数就是1bit,中文名称叫位。字符即一个字母或者一个汉字。一个字母由一个字节组成,而汉字根据编码不同由2个或者3个组成。

     Java I/O类的实现原理是装饰者模式,装饰者模式动态地将责任附加到对象上。若要扩展功能,装饰者提供了比继承更有弹性的替代方案。

    1.将被装饰者(Concrete Component)当做类中一个成员变量。
    2.利用构造将被装饰者注入

    然后引入I/O模型:

    阻塞和非阻塞,描述的是结果的请求

    阻塞:在得到结果之前就一直呆在那,啥也不干,此时线程挂起,就如其名,线程被阻塞了。

    非阻塞:如果没得到结果就返回,等一会再去请求,直到得到结果为止。

    异步和同步,描述的是结果的发出,当调用方的请求进来后。

    同步:在没获取到结果前就不返回给调用方,如果调用方是阻塞的,那么调用方就会一直等着。如果调用方是非阻塞的,调用方就会先回去,等一会再来问问得到结果没。

    异步:调用方一来,会直接返回,等执行完实际的逻辑后在通过回调函数把结果返回给调用方。

    异步非阻塞

    用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。

    事实上就是,用户提交IO请求,然后直接返回,并且内核自动完成将数据从内核缓冲区复制到用户缓冲区,完成后再通知用户。

    当然,内核通知我们以后我们还需要执行剩余的操作,但是我们的代码已经继续往下运行了,所以AIO采用了回调的机制,为每个socket注册一个回调事件或者是回调处理器,在处理器中完成数据的操作,也就是内核通知到用户的时候,会自动触发回调函数,完成剩余操作。 这样的方式就是异步的网络编程。

    但是,想要让操作系统支持这样的功能并非易事,windows的IOCP可以支持AIO方式,但是Linux的AIO支持并不是很好。(所以Netty后来也取消了对AIO的支持)

    IO多路复用 :使用IO多路复用器管理socket,由于每个socket是一个文件描述符,操作系统可以维护socket和它的连接状态,一般分为可连接,可读和可写等状态。

    每当用户程序接受到socket请求,将请求托管给多路复用器进行监控,当程序对请求感兴趣的事件发生时,多路复用器以某种方式通知或是用户程序自己轮询请求,以便获取就绪的socket,然后只需使用一个线程进行轮询,多个线程处理就绪请求即可。

    IO多路复用避免了每个socket请求都需要一个线程去处理,而是使用事件驱动的方式,让少数的线程去处理多数socket的IO请求。

    Linux操作系统对IO多路复用提供了较好的支持,select,poll,epoll是Linux提供的支持IO多路复用的API。一般用户程序基于这个API去开发自己的IO复用模型。比如NIO的非阻塞模型,就是采用了IO多路复用的方式,是基于epoll实现的。

    select方式主要是使用数组来存储socket描述符,系统将发生事件的描述符做标记,然后IO复用器在轮询描述符数组的时候,就可以知道哪些请求是就绪了的。缺点是数组的长度只能到1024,并且需要不断地在内核空间和用户空间之间拷贝数组。

    poll方式不采用数组存储描述符,而是使用独立的自定义数据结构pollfd来描述,并且使用id来表示描述符,能支持更多的请求数量,缺点和select方式有点类似,就是轮询的效率很低,并且需要拷贝数据。

     

    当然,上述两种方法适合在请求总数较少,并且活跃请求数较多的情况,这种场景下他们的性能还是不错的。

     

    epoll,epoll函数会在内核空间开辟一个特殊的数据结构,红黑树,树节点中存放的是一个socket描述符以及用户程序感兴趣的事件类型。同时epoll还会维护一个链表。用于存储已经就绪的socket描述符节点。由Linux内核完成对红黑树的维护,当事件到达时,内核将就绪的socket节点加入链表中,用户程序可以直接访问这个链表以便获取就绪的socket。

    有了直接与文件交互的I/O类,那怎么样与网络交互呢?这里就引入Socket

    socket是操作系统提供的网络编程接口,他封装了对于TCP/IP协议栈的支持,用于进程间的通信,当有连接接入主机以后,操作系统自动为其分配一个socket套接字,套接字绑定着一个IP与端口号。通过socket接口,可以获取tcp连接的输入流和输出流,并且通过他们进行读取和写入此操作。

    Java提供了net包用于socket编程,同时支持像Inetaddress,URL等工具类,使用socket绑定一个endpoint(ip+端口号),可以用于客户端的请求处理和发送,使用serversocket绑定本地ip和端口号,可以用于服务端接收TCP请求。

    BIO编程模型

    所谓BIO,就是Block IO,阻塞式的IO。这个阻塞主要发生在:ServerSocket接收请求时(accept()方法)、InputStream、OutputStream(输入输出流的读和写)都是阻塞的。这个可以在下面代码的调试中发现,比如在客户端接收服务器消息的输入流处打上断点,除非服务器发来消息,不然断点是一直停在这个地方的。也就是说这个线程在这时间是被阻塞的。

    这里放一个BIO模型实现的聊天室Demo,对于Socket编程基础请移步:https://www.cnblogs.com/yiwangzhibujian/p/7107785.html

    服务端ChatServer:

     1 /*
     2 1. 功能实现:这个类的作用就像Acceptor。它有两个比较关键的全局变量,一个就是存储在线用户信息的Map,一个就是线程池。
     3 这个类会监听端口,接收客户端的请求,然后为客户端分配工作线程。
     4 还会提供一些常用的工具方法给每个工作线程调用,比如:发送消息、添加在线用户等。
     5 */
     6 
     7 import java.io.*;
     8 import java.net.*;
     9 import java.util.Map;
    10 import java.util.concurrent.*;
    11 
    12 public class ChatServer {
    13     private int DEFAULT_PORT = 8888;
    14     /**
    15      * 创建一个Map存储在线用户的信息。这个map可以统计在线用户、针对这些用户可以转发其他用户发送的消息
    16      * 因为会有多个线程操作这个map,所以为了安全起见用ConcurrentHashMap
    17      * 在这里key就是客户端的端口号,但在实际中肯定不会用端口号区分用户,如果是web的话一般用session。
    18      * value是IO的Writer,用以存储客户端发送的消息
    19      */
    20     private Map<Integer, Writer> map = new ConcurrentHashMap<>();
    21     /**
    22      * 创建线程池,线程上限为10个,如果第11个客户端请求进来,服务器会接收但是不会去分配线程处理它。
    23      * 前10个客户端的聊天记录,它看不见。当有一个客户端下线时,这第11个客户端就会被分配线程,服务器显示在线
    24      * 大家可以把10再设置小一点,测试看看
    25      * */
    26     private ExecutorService executorService = Executors.newFixedThreadPool(10);
    27     //客户端连接时往map添加客户端
    28     public void addClient(Socket socket) throws IOException {
    29         if (socket != null) {
    30             BufferedWriter writer = new BufferedWriter(
    31                     new OutputStreamWriter(socket.getOutputStream())
    32             );
    33             map.put(socket.getPort(), writer);
    34             System.out.println("Client["+socket.getPort()+"]:Online");
    35         }
    36     }
    37 
    38     //断开连接时map里移除客户端
    39     public void removeClient(Socket socket) throws Exception {
    40         if (socket != null) {
    41             if (map.containsKey(socket.getPort())) {
    42                 map.get(socket.getPort()).close();
    43                 map.remove(socket.getPort());
    44             }
    45             System.out.println("Client[" + socket.getPort() + "]Offline");
    46         }
    47     }
    48 
    49     //转发客户端消息,这个方法就是把消息发送给在线的其他的所有客户端
    50     public void sendMessage(Socket socket, String msg) throws IOException {
    51         //遍历在线客户端
    52         for (Integer port : map.keySet()) {
    53             //发送给在线的其他客户端
    54             if (port != socket.getPort()) {
    55                 Writer writer = map.get(port);
    56                 writer.write(msg);
    57                 writer.flush();
    58             }
    59         }
    60     }
    61 
    62     //接收客户端请求,并分配Handler去处理请求
    63     public void start() {
    64         try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
    65             System.out.println("Server Start,The Port is:"+DEFAULT_PORT);
    66             while (true){
    67                 //等待客户端连接
    68                 Socket socket=serverSocket.accept();
    69                 //为客户端分配一个ChatHandler线程
    70                 executorService.execute(new ChatHandler(this, socket));
    71             }
    72         } catch (IOException e) {
    73             e.printStackTrace();
    74         }
    75     }
    76 
    77     public static void main(String[] args) {
    78         ChatServer server=new ChatServer();
    79         server.start();
    80     }
    81 }
    View Code

    服务端的ChatHandler:

     1 /*
     2 1. 功能实现:这个类就是工作线程的类。在这个项目中,它的工作很简单:
     3 把接收到的消息转发给其他客户端,当然还有一些小功能,比如添加移除在线用户。
     4 */
     5 
     6 import java.io.*;
     7 import java.net.*;
     8 
     9 public class ChatHandler implements Runnable {
    10     private ChatServer server;
    11     private Socket socket;
    12 
    13     //构造函数,ChatServer通过这个分配Handler线程
    14     public ChatHandler(ChatServer server, Socket socket) {
    15         this.server = server;
    16         this.socket = socket;
    17     }
    18 
    19     @Override
    20     public void run() {
    21         try {
    22             //往map里添加这个客户端
    23             server.addClient(socket);
    24             //读取这个客户端发送的消息
    25             BufferedReader reader = new BufferedReader(
    26                     new InputStreamReader(socket.getInputStream())
    27             );
    28             String msg = null;
    29             while ((msg = reader.readLine()) != null) {
    30                 //这样拼接是为了让其他客户端也能看清是谁发送的消息
    31                 String sendmsg = "Client[" + socket.getPort() + "]:" + msg;
    32                 //服务器打印这个消息
    33                 System.out.println(sendmsg);
    34                 //将收到的消息转发给其他在线客户端
    35                 server.sendMessage(socket, sendmsg + "
    ");
    36                 if (msg.equals("quit")) {
    37                     break;
    38                 }
    39             }
    40         } catch (IOException e) {
    41             e.printStackTrace();
    42         } finally {
    43             //如果用户退出或者发生异常,就在map中移除该客户端
    44             try {
    45                 server.removeClient(socket);
    46             } catch (Exception e) {
    47                 e.printStackTrace();
    48             }
    49         }
    50     }
    51 }
    View Code

    客户端ChatClient:

     1 /*
     2 1. 功能实现:客户端启动类,也就是主线程,会通过Socket和服务器连接。也提供了两个工具方法:发送消息和接收消息。
     3 */
     4 
     5 import java.io.*;
     6 import java.net.*;
     7 
     8 public class ChatClient {
     9     private BufferedReader reader;
    10     private BufferedWriter writer;
    11     private Socket socket;
    12     //发送消息给服务器
    13     public void sendToServer(String msg) throws IOException {
    14         //发送之前,判断socket的输出流是否关闭
    15         if (!socket.isOutputShutdown()) {
    16             //如果没有关闭就把用户键入的消息放到writer里面
    17             writer.write(msg + "
    ");
    18             writer.flush();
    19         }
    20     }
    21     //从服务器接收消息
    22     public String receive() throws IOException {
    23         String msg = null;
    24         //判断socket的输入流是否关闭
    25         if (!socket.isInputShutdown()) {
    26             //没有关闭的话就可以通过reader读取服务器发送来的消息。注意:如果没有读取到消息线程会阻塞在这里
    27             msg = reader.readLine();
    28         }
    29         return msg;
    30     }
    31 
    32     public void start() {
    33         //和服务创建连接
    34         try {
    35             socket = new Socket("127.0.0.1", 8888);
    36             reader=new BufferedReader(
    37                     new InputStreamReader(socket.getInputStream())
    38             );
    39             writer=new BufferedWriter(
    40                     new OutputStreamWriter(socket.getOutputStream())
    41             );
    42             //新建一个线程去监听用户输入的消息
    43             new Thread(new UserInputHandler(this)).start();
    44             /**
    45              * 不停的读取服务器转发的其他客户端的信息
    46              * 记录一下之前踩过的小坑:
    47              * 这里一定要创建一个msg接收信息,如果直接用receive()方法判断和输出receive()的话会造成有的消息不会显示
    48              * 因为receive()获取时,在返回之前是阻塞的,一旦接收到消息才会返回,也就是while这里是阻塞的,一旦有消息就会进入到while里面
    49              * 这时候如果输出的是receive(),那么上次获取的信息就会丢失,然后阻塞在System.out.println
    50              * */
    51             String msg=null;
    52             while ((msg=receive())!=null){
    53                 System.out.println(msg);
    54             }
    55         } catch (IOException e) {
    56             e.printStackTrace();
    57         }finally {
    58             try {
    59                 if(writer!=null){
    60                     writer.close();
    61                 }
    62             } catch (IOException e) {
    63                 e.printStackTrace();
    64             }
    65         }
    66     }
    67 
    68     public static void main(String[] args) {
    69         new ChatClient().start();
    70     }
    71 }
    View Code

    客户端UserInputHandler:

     1 /*
     2 1. 功能实现:专门负责等待用户输入信息的线程,一旦有信息键入,就马上发送给服务器。
     3 */
     4 
     5 import java.io.*;
     6 
     7 public class UserInputHandler implements Runnable {
     8     private ChatClient client;
     9 
    10     public UserInputHandler(ChatClient client) {
    11         this.client = client;
    12     }
    13 
    14     @Override
    15     public void run() {
    16         try {
    17             //接收用户输入的消息
    18             BufferedReader reader = new BufferedReader(
    19                     new InputStreamReader(System.in)
    20             );
    21             //不停的获取reader中的System.in,实现了等待用户输入的效果
    22             while (true) {
    23                 String input = reader.readLine();
    24                 //向服务器发送消息
    25                 client.sendToServer(input);
    26                 if (input.equals("quit"))
    27                     break;
    28             }
    29         } catch (IOException e) {
    30             e.printStackTrace();
    31         }
    32     }
    33 }
    View Code

    这里有一个比较难解决的坑填一下:当Socket关闭的时候,服务端就会收到响应的关闭信号,那么服务端也就知道流已经关闭了,这个时候读取操作完成,就可以继续后续工作。比如利用BufferedReader.readerLine():

    • 误以为readLine()是读取到没有数据时就返回null(因为其它read方法当读到没有数据时返回-1),而实际上readLine()是一个阻塞函数,当没有数据读取时,就一直会阻塞在那,而不是返回null;因为readLine()阻塞后,System.out.println(message)这句根本就不会执行到,所以在接收端就不会有东西输出。要想执行到System.out.println(message),一个办法是发送完数据后就关掉流,这样readLine()结束阻塞状态,而能够得到正确的结果,但显然不能传一行就关一次数据流;另外一个办法是把System.out.println(message)放到while循环体内就可以。
    • readLine()只有在数据流发生异常或者另一端被close()掉时,才会返回null值。
    • 如果不指定buffer大小,则readLine()使用的buffer有8192个字符。在达到buffer大小之前,只有遇到"/r"、"/n"、"/r/n"才会返回。

    告知对方已发送完命令:
    1. 通过socket关闭,socket.close()。
    2. 通过socket关闭输出流,socket.shutdownOutput()。
    3. 这里也可以采用约定符号告知对方已发送完命令。
    4. 根据长度界定:送一次消息变成了两个步骤:  1. 发送消息的长度   2. 发送消息

    如果你了解一点class文件的结构(后续会写,敬请期待),那么你就会佩服这么设计方式,也就是说我们可以在此找灵感,就是我们可以先指定后续命令的长度,然后读取指定长度的内容做为客户端发送的消息。

      现在首要的问题就是用几个字节指定长度呢,我们可以算一算:

    • 1个字节:最大256,表示256B
    • 2个字节:最大65536,表示64K
    • 3个字节:最大16777216,表示16M
    • 4个字节:最大4294967296,表示4G
    • 依次类推

      这个时候是不是很纠结,最大的当然是最保险的,但是真的有必要选择最大的吗,其实如果你稍微了解一点UTF-8的编码方式(字符编码后续会写,敬请期待),那么你就应该能想到为什么一定要固定表示长度字节的长度呢,我们可以使用变长方式来表示长度的表示,比如:

    • 第一个字节首位为0:即0XXXXXXX,表示长度就一个字节,最大128,表示128B
    • 第一个字节首位为110,那么附带后面一个字节表示长度:即110XXXXX 10XXXXXX,最大2048,表示2K
    • 第一个字节首位为1110,那么附带后面二个字节表示长度:即110XXXXX 10XXXXXX 10XXXXXX,最大131072,表示128K
    • 依次类推

      上面提到的这种用法适合高富帅的程序员使用,一般呢,如果用作命名发送,两个字节就够了,如果还不放心4个字节基本就能满足你的所有要求,下面的例子我们将采用2个字节表示长度,目的只是给你一种思路,让你知道有这种方式来获取消息的结尾:

    服务端SocketServer:

     1 import java.io.InputStream;
     2 import java.net.ServerSocket;
     3 import java.net.Socket;
     4 
     5 public class SocketServer {
     6   public static void main(String[] args) throws Exception {
     7     // 监听指定的端口
     8     int port = 55533;
     9     ServerSocket server = new ServerSocket(port);
    10 
    11     // server将一直等待连接的到来
    12     System.out.println("server将一直等待连接的到来");
    13     Socket socket = server.accept();
    14     // 建立好连接后,从socket中获取输入流,并建立缓冲区进行读取
    15     InputStream inputStream = socket.getInputStream();
    16     byte[] bytes;
    17     // 因为可以复用Socket且能判断长度,所以可以一个Socket用到底
    18     while (true) {
    19       // 首先读取两个字节表示的长度
    20       int first = inputStream.read();
    21       //如果读取的值为-1 说明到了流的末尾,Socket已经被关闭了,此时将不能再去读取
    22       if(first==-1){
    23         break;
    24       }
    25       int second = inputStream.read();
    26       int length = (first << 8) + second;
    27       // 然后构造一个指定长的byte数组
    28       bytes = new byte[length];
    29       // 然后读取指定长度的消息即可
    30       inputStream.read(bytes);
    31       System.out.println("get message from client: " + new String(bytes, "UTF-8"));
    32     }
    33     inputStream.close();
    34     socket.close();
    35     server.close();
    36   }
    37 }

    客户端SocketClient:

     1 import java.io.OutputStream;
     2 import java.net.Socket;
     3 
     4 public class SocketClient {
     5   public static void main(String args[]) throws Exception {
     6     // 要连接的服务端IP地址和端口
     7     String host = "127.0.0.1";
     8     int port = 55533;
     9     // 与服务端建立连接
    10     Socket socket = new Socket(host, port);
    11     // 建立连接后获得输出流
    12     OutputStream outputStream = socket.getOutputStream();
    13     String message = "你好";
    14     //首先需要计算得知消息的长度
    15     byte[] sendBytes = message.getBytes("UTF-8");
    16     //然后将消息的长度优先发送出去
    17     outputStream.write(sendBytes.length >>8);
    18     outputStream.write(sendBytes.length);
    19     //然后将消息再次发送出去
    20     outputStream.write(sendBytes);
    21     outputStream.flush();
    22     //==========此处重复发送一次,实际项目中为多个命名,此处只为展示用法
    23     message = "第二条消息";
    24     sendBytes = message.getBytes("UTF-8");
    25     outputStream.write(sendBytes.length >>8);
    26     outputStream.write(sendBytes.length);
    27     outputStream.write(sendBytes);
    28     outputStream.flush();
    29     //==========此处重复发送一次,实际项目中为多个命名,此处只为展示用法
    30     message = "the third message!";
    31     sendBytes = message.getBytes("UTF-8");
    32     outputStream.write(sendBytes.length >>8);
    33     outputStream.write(sendBytes.length);
    34     outputStream.write(sendBytes);    
    35     
    36     outputStream.close();
    37     socket.close();
    38   }
    39 }

    NIO编程模型

    NIO包含下面几个核心的组件:

    • Channels

    • Buffers

    • Selectors

    整个NIO体系包含的类远远不止这三个,只能说这三个是NIO体系的“核心API”。

     

    通道

    在Java NIO中,主要使用的通道如下(涵盖了UDP 和 TCP 网络IO,以及文件IO):

    • DatagramChannel

    • SocketChannel

    • FileChannel

    • ServerSocketChannel

    缓冲区

    在Java NIO中使用的核心缓冲区如下(覆盖了通过I/O发送的基本数据类型:byte, char、short, int, long, float, double ,long):

    • ByteBuffer

    • CharBuffer

    • ShortBuffer

    • IntBuffer

    • FloatBuffer

    • DoubleBuffer

    • LongBuffer

    选择器

    Java NIO提供了“选择器”的概念。这是一个可以用于监视多个通道的对象,如数据到达,连接打开等。因此,单线程可以监视多个通道中的数据。

    如果应用程序有多个通道(连接)打开,但每个连接的流量都很低,则可考虑使用它。 例如:在聊天服务器中。

    要使用Selector的话,我们必须把Channel注册到Selector上,然后就可以调用Selector的select()方法。这个方法会进入阻塞,直到有一个channel的状态符合条件。当方法返回后,线程可以处理这些事件。

     

    Buffer(缓冲区)介绍

    Java NIO Buffers用于和NIO Channel交互。 我们从Channel中读取数据到buffers里,从Buffer把数据写入到Channels.

    Buffer本质上就是一块内存区,可以用来写入数据,并在稍后读取出来。这块内存被NIO Buffer包裹起来,对外提供一系列的读写方便开发的接口。

    在Java NIO中使用的核心缓冲区如下(覆盖了通过I/O发送的基本数据类型:byte,char,short,int,long,float,double ,long):

    利用Buffer读写数据,通常遵循四个步骤:

    1. 把数据写入buffer;

    2. 调用flip;

    3. 从Buffer中读取数据;

    4. 调用buffer.clear()或者buffer.compact()。

    当写入数据到buffer中时,buffer会记录已经写入的数据大小。当需要读数据时,通过 flip() 方法把buffer从写模式调整为读模式;在读模式下,可以读取所有已经写入的数据。

    当读取完数据后,需要清空buffer,以满足后续写入操作。清空buffer有两种方式:调用 clear()compact() 方法。clear会清空整个buffer,compact则只清空已读取的数据,未被读取的数据会被移动到buffer的开始位置,写入位置则近跟着未读数据之后

     

    Buffer的容量,位置,上限(Buffer Capacity,Position and Limit)

    Buffer缓冲区实质上就是一块内存,用于写入数据,也供后续再次读取数据。这块内存被NIO Buffer管理,并提供一系列的方法用于更简单的操作这块内存。

    一个Buffer有三个属性是必须掌握的,分别是:

    • capacity容量

    • position位置

    • limit限制

    position和limit的具体含义取决于当前buffer的模式。capacity在两种模式下都表示容量。

    读写模式下position和limit的含义:

     

    容量(Capacity)

    作为一块内存,buffer有一个固定的大小,叫做capacit(容量)。也就是最多只能写入容量值得字节,整形等数据。一旦buffer写满了就需要清空已读数据以便下次继续写入新的数据。

     

    位置(Position)

    当写入数据到Buffer的时候需要从一个确定的位置开始,默认初始化时这个位置position为0,一旦写入了数据比如一个字节,整形数据,那么position的值就会指向数据之后的一个单元,position最大可以到capacity-1.

    当从Buffer读取数据时,也需要从一个确定的位置开始。buffer从写入模式变为读取模式时,position会归零,每次读取后,position向后移动。

     

    上限(Limit)

    在写模式,limit的含义是我们所能写入的最大数据量,它等同于buffer的容量。

    一旦切换到读模式,limit则代表我们所能读取的最大数据量,他的值等同于写模式下position的位置。换句话说,您可以读取与写入数量相同的字节数(限制设置为写入的字节数,由位置标记)。

     

    Buffer的常见方法

    方法介绍
    abstract Object array() 返回支持此缓冲区的数组 (可选操作)
    abstract int arrayOffset() 返回该缓冲区的缓冲区的第一个元素的在数组中的偏移量 (可选操作)
    int capacity() 返回此缓冲区的容量
    Buffer clear() 清除此缓存区。将position = 0;limit = capacity;mark = -1;
    Buffer flip() flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。
    abstract boolean hasArray() 告诉这个缓冲区是否由可访问的数组支持
    boolean hasRemaining() return position < limit,返回是否还有未读内容
    abstract boolean isDirect() 判断个缓冲区是否为 direct
    abstract boolean isReadOnly() 判断告知这个缓冲区是否是只读的
    int limit() 返回此缓冲区的限制
    Buffer position(int newPosition) 设置这个缓冲区的位置
    int remaining() return limit - position; 返回limit和position之间相对位置差
    Buffer rewind() 把position设为0,mark设为-1,不改变limit的值
    Buffer mark() 将此缓冲区的标记设置在其位置

     

    Buffer的使用方式/方法介绍

    分配缓冲区(Allocating a Buffer)

    为了获得缓冲区对象,我们必须首先分配一个缓冲区。在每个Buffer类中,allocate()方法用于分配缓冲区。

    下面来看看一个示例:CharBuffer分配空间大小为2048个字符。

    1. CharBuffer buf = CharBuffer.allocate(2048);

    写入数据到缓冲区(Writing Data to a Buffer)

    写数据到Buffer有两种方法:

    • 从Channel中写数据到Buffer

    • 手动写数据到Buffer,调用put方法

    下面是一个实例,演示从Channel写数据到Buffer:

    1. int bytesRead = inChannel.read(buf); //read into buffer.

    通过put写数据:

    1. buf.put(127);

    put方法有很多不同版本,对应不同的写数据方法。例如把数据写到特定的位置,或者把一个字节数据写入buffer。看考JavaDoc文档可以查阅的更多数据。

     

    翻转(flip())

    flip()方法可以吧Buffer从写模式切换到读模式。调用flip方法会把position归零,并设置limit为之前的position的值。 也就是说,现在position代表的是读取位置,limit标示的是已写入的数据位置。

     

    从Buffer读取数据(Reading Data from a Buffer)

    从Buffer读数据也有两种方式。

    • 从buffer读数据到channel

    • 从buffer直接读取数据,调用get方法

    读取数据到channel的例子:

    1. int bytesWritten = inChannel.write(buf);

    调用get读取数据的例子:

    1. byte aByte = buf.get();

    get也有诸多版本,对应了不同的读取方式。

     

    rewind()

    Buffer.rewind()方法将position置为0,这样我们可以重复读取buffer中的数据。limit保持不变。

     

    clear() and compact()

    一旦我们从buffer中读取完数据,需要复用buffer为下次写数据做准备。只需要调用clear()或compact()方法。

    如果调用的是clear()方法,position将被设回0,limit被设置成 capacity的值。换句话说,Buffer 被清空了。Buffer中的数据并未清除,只是这些标记告诉我们可以从哪里开始往Buffer里写数据。

    如果Buffer还有一些数据没有读取完,调用clear就会导致这部分数据被“遗忘”,因为我们没有标记这部分数据未读。

    针对这种情况,如果需要保留未读数据,那么可以使用compact。 因此 compact()clear() 的区别就在于: 对未读数据的处理,是保留这部分数据还是一起清空

     

    mark()与reset()方法

    通过调用Buffer.mark()方法,可以标记Buffer中的一个特定position。之后可以通过调用Buffer.reset()方法恢复到这个position。例如:

    1. buffer.mark();

    2. //call buffer.get() a couple of times, e.g. during parsing.

    3. buffer.reset();  //set position back to mark.

    equals() and compareTo()

    可以用eqauls和compareTo比较两个buffer

    equals():

    判断两个buffer相对,需满足:

    • 类型相同

    • buffer中剩余字节数相同

    • 所有剩余字节相等

    从上面的三个条件可以看出,equals只比较buffer中的部分内容,并不会去比较每一个元素。

    compareTo():

    compareTo也是比较buffer中的剩余元素,只不过这个方法适用于比较排序的。

      

    Channel

    Channel(通道)主要用于传输数据,然后从Buffer中写入或读取。它们两个结合起来虽然和流有些相似,但主要有以下几点区别:
      1.流是单向的,可以发现Stream的输入流和输出流是独立的,它们只能输入或输出。而通道既可以读也可以写。
      2.通道本身不能存放数据,只能借助Buffer。
      3.Channel支持异步。

    Channel有如下三个常用的类:FileChannel、SocketChannel、ServerSocketChannel。从名字也可以看出区别,第一个是对文件数据的读写,后面两个则是针对Socket和ServerSocket,这里我们只是用后面两个。

    Selector

    多个Channel可以注册到Selector,就可以直接通过一个Selector管理多个通道。Channel在不同的时间或者不同的事件下有不同的状态,Selector会通过轮询来达到监视的效果,如果查到Channel的状态正好是我们注册时声明的所要监视的状态,我们就可以查出这些通道,然后做相应的处理。这些状态如下:
      1.客户端的SocketChannel和服务器端建立连接,SocketChannel状态就是Connect
      2.服务器端的ServerSocketChannel接收了客户端的请求,ServerSocketChannel状态就是Accept
      3.当SocketChannel有数据可读,那么它们的状态就是Read
      4.当我们需要向Channel中写数据时,那么它们的状态就是Write

    下面继续介绍一些基本的接口操作。

    1. 首先,我们开启一个 Selector。你们爱翻译成选择器也好,多路复用器也好。

      Selector selector = Selector.open();
      
    2. 将 Channel 注册到 Selector 上。前面我们说了,Selector 建立在非阻塞模式之上,所以注册到 Selector 的 Channel 必须要支持非阻塞模式,FileChannel 不支持非阻塞,我们这里讨论最常见的 SocketChannel 和 ServerSocketChannel。

      // 将通道设置为非阻塞模式,因为默认都是阻塞模式的
      channel.configureBlocking(false);
      // 注册
      SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
      

      register 方法的第二个 int 型参数(使用二进制的标记位)用于表明需要监听哪些感兴趣的事件,共以下四种事件:

      • SelectionKey.OP_READ

        对应 00000001,通道中有数据可以进行读取

      • SelectionKey.OP_WRITE

        对应 00000100,可以往通道中写入数据

      • SelectionKey.OP_CONNECT

        对应 00001000,成功建立 TCP 连接

      • SelectionKey.OP_ACCEPT

        对应 00010000,接受 TCP 连接

    3. 我们可以同时监听一个 Channel 中的发生的多个事件,比如我们要监听 ACCEPT 和 READ 事件,那么指定参数为二进制的 00010001 即十进制数值 17 即可。

      注册方法返回值是 SelectionKey 实例,它包含了 Channel 和 Selector 信息,也包括了一个叫做 Interest Set 的信息,即我们设置的我们感兴趣的正在监听的事件集合。

    4. 调用 select() 方法获取通道信息。用于判断是否有我们感兴趣的事件已经发生了。

    操作类型描述所属对象
    OP_READ 1 << 0 读操作 SocketChannel
    OP_WRITE 1 << 2 写操作 SocketChannel
    OP_CONNECT 1 << 3 连接socket操作 SocketChannel
    OP_ACCEPT 1 << 4 接受socket操作 ServerSocketChannel

    Selector 的操作就是以上 3 步,这里来一个简单的示例:

    Selector selector = Selector.open();
    
    channel.configureBlocking(false);
    
    SelectionKey key = channel.register(selector, SelectionKey.OP_READ);
    
    while(true) {
      // 判断是否有事件准备好
      int readyChannels = selector.select();
      if(readyChannels == 0) continue;
    
      // 遍历
      Set<SelectionKey> selectedKeys = selector.selectedKeys();
      Iterator<SelectionKey> keyIterator = selectedKeys.iterator();
      while(keyIterator.hasNext()) {
        SelectionKey key = keyIterator.next();
    
        if(key.isAcceptable()) {
            // a connection was accepted by a ServerSocketChannel.
    
        } else if (key.isConnectable()) {
            // a connection was established with a remote server.
    
        } else if (key.isReadable()) {
            // a channel is ready for reading
    
        } else if (key.isWritable()) {
            // a channel is ready for writing
        }
    
        keyIterator.remove();
      }
    }

    对于 Selector,我们还需要非常熟悉以下几个方法:

    1. select()

      调用此方法,会将上次 select 之后的准备好的 channel 对应的 SelectionKey 复制到 selected set 中。如果没有任何通道准备好,这个方法会阻塞,直到至少有一个通道准备好。

    2. selectNow()

      功能和 select 一样,区别在于如果没有准备好的通道,那么此方法会立即返回 0。

    3. select(long timeout)

      看了前面两个,这个应该很好理解了,如果没有通道准备好,此方法会等待一会

    4. wakeup()

      这个方法是用来唤醒等待在 select() 和 select(timeout) 上的线程的。如果 wakeup() 先被调用,此时没有线程在 select 上阻塞,那么之后的一个 select() 或 select(timeout) 会立即返回,而不会阻塞,当然,它只会作用一次。

     这里放一个NIO模型实现的聊天室Demo:

    服务端ChatServer:

      1 /**
      2  * 1. 功能描述:功能基本上和上面讲的工作流程差不多,还会有一些工具方法,都比较简单,就不多说了,如:转发消息,客户端下线后从在线列表移除客户端等。
      3  */
      4 
      5 import java.io.IOException;
      6 import java.net.*;
      7 import java.nio.*;
      8 import java.nio.channels.*;
      9 import java.nio.charset.Charset;
     10 import java.util.Set;
     11 
     12 public class ChatServer {
     13     //设置缓冲区的大小,这里设置为1024个字节
     14     private static final int BUFFER = 1024;
     15 
     16     //Channel都要配合缓冲区进行读写,所以这里创建一个读缓冲区和一个写缓冲区
     17     //allocate()静态方法就是设置缓存区大小的方法
     18     private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
     19     private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
     20 
     21     //为了监听端口更灵活,再不写死了,用一个构造函数设置需要监听的端口号
     22     private int port;
     23 
     24     public ChatServer(int port) {
     25         this.port = port;
     26     }
     27 
     28     private void start() {
     29         //创建ServerSocketChannel和Selector并打开
     30         try (ServerSocketChannel server = ServerSocketChannel.open();
     31              Selector selector = Selector.open()) {
     32             //【重点,实现NIO编程模型的关键】configureBlocking设置ServerSocketChannel为非阻塞式调用,Channel默认的是阻塞的调用方式
     33             server.configureBlocking(false);
     34             //绑定监听端口,这里不是给ServerSocketChannel绑定,而是给ServerSocket绑定,socket()就是获取通道原生的ServerSocket或Socket
     35             server.socket().bind(new InetSocketAddress(port));
     36 
     37             //把server注册到Selector并监听Accept事件
     38             server.register(selector, SelectionKey.OP_ACCEPT);
     39             System.out.println("启动服务器,监听端口:" + port);
     40 
     41 
     42             while (true) {
     43                 //select()会返回此时触发了多少个Selector监听的事件
     44                 if(selector.select()>0) {
     45                     //获取这些已经触发的事件,selectedKeys()返回的是触发事件的所有信息
     46                     Set<SelectionKey> selectionKeys = selector.selectedKeys();
     47                     //循环处理这些事件
     48                     for (SelectionKey key : selectionKeys) {
     49                         handles(key, selector);
     50                     }
     51                     //处理完后清空selectedKeys,避免重复处理
     52                     selectionKeys.clear();
     53                 }
     54             }
     55         } catch (IOException e) {
     56             e.printStackTrace();
     57         }
     58     }
     59 
     60     //处理事件的方法
     61     private void handles(SelectionKey key, Selector selector) throws IOException {
     62         //当触发了Accept事件,也就是有客户端请求进来
     63         if (key.isAcceptable()) {
     64             //获取ServerSocketChannel
     65             ServerSocketChannel server = (ServerSocketChannel) key.channel();
     66             //然后通过accept()方法接收客户端的请求,这个方法会返回客户端的SocketChannel,这一步和原生的ServerSocket类似
     67             SocketChannel client = server.accept();
     68             client.configureBlocking(false);
     69 
     70             //把客户端的SocketChannel注册到Selector,并监听Read事件
     71             client.register(selector, SelectionKey.OP_READ);
     72             System.out.println("客户端[" + client.socket().getPort() + "]上线啦!");
     73         }
     74 
     75         //当触发了Read事件,也就是客户端发来了消息
     76         if (key.isReadable()) {
     77             SocketChannel client = (SocketChannel) key.channel();
     78             //获取消息
     79             String msg = receive(client);
     80             System.out.println("客户端[" + client.socket().getPort() + "]:" + msg);
     81             //把消息转发给其他客户端
     82             sendMessage(client, msg, selector);
     83             //判断用户是否退出
     84             if (msg.equals("quit")) {
     85                 //解除该事件的监听
     86                 key.cancel();
     87                 //更新Selector
     88                 selector.wakeup();
     89                 System.out.println("客户端[" + client.socket().getPort() + "]下线了!");
     90             }
     91         }
     92     }
     93 
     94     //编码方式设置为utf-8,下面字符和字符串互转时用得到
     95     private Charset charset = Charset.forName("UTF-8");
     96 
     97     //接收消息的方法
     98     private String receive(SocketChannel client) throws IOException {
     99         //用缓冲区之前先清空一下,避免之前的信息残留
    100         read_buffer.clear();
    101         //把通道里的信息读取到缓冲区,用while循环一直读取,直到读完所有消息。因为没有明确的类似
    这样的结尾,所以要一直读到没有字节为止
    102         while (client.read(read_buffer) > 0) ;
    103         //把消息读取到缓冲区后,需要转换buffer的读写状态,不明白的看看前面的Buffer的讲解
    104         read_buffer.flip();
    105         return String.valueOf(charset.decode(read_buffer));
    106     }
    107 
    108     //转发消息的方法
    109     private void sendMessage(SocketChannel client, String msg, Selector selector) throws IOException {
    110         msg = "客户端[" + client.socket().getPort() + "]:" + msg;
    111         //获取所有客户端,keys()与前面的selectedKeys不同,这个是获取所有已经注册的信息,而selectedKeys获取的是触发了的事件的信息
    112         for (SelectionKey key : selector.keys()) {
    113             //排除服务器和本客户端并且保证key是有效的,isValid()会判断Selector监听是否正常、对应的通道是保持连接的状态等
    114             if (!(key.channel() instanceof ServerSocketChannel) && !client.equals(key.channel()) && key.isValid()) {
    115                 SocketChannel otherClient = (SocketChannel) key.channel();
    116                 write_buffer.clear();
    117                 write_buffer.put(charset.encode(msg));
    118                 write_buffer.flip();
    119                 //把消息写入到缓冲区后,再把缓冲区的内容写到客户端对应的通道中
    120                 while (write_buffer.hasRemaining()) {
    121                     otherClient.write(write_buffer);
    122                 }
    123             }
    124         }
    125     }
    126 
    127     public static void main(String[] args) {
    128         new ChatServer(8888).start();
    129     }
    130 }
    View Code

    客户端ChatClient:

      1 /**
      2  * 1. 功能描述:基本和前两章的BIO、NIO没什么区别,一个线程监听用户输入信息并发送,主线程异步的读取服务器信息。
      3  */
      4 
      5 import java.io.IOException;
      6 import java.net.InetSocketAddress;
      7 import java.nio.ByteBuffer;
      8 import java.nio.channels.ClosedSelectorException;
      9 import java.nio.channels.SelectionKey;
     10 import java.nio.channels.Selector;
     11 import java.nio.channels.SocketChannel;
     12 import java.nio.charset.Charset;
     13 import java.util.Set;
     14 
     15 public class ChatClient {
     16     private static final int BUFFER = 1024;
     17     private ByteBuffer read_buffer = ByteBuffer.allocate(BUFFER);
     18     private ByteBuffer write_buffer = ByteBuffer.allocate(BUFFER);
     19     //声明成全局变量是为了方便下面一些工具方法的调用,就不用try with resource了
     20     private SocketChannel client;
     21     private Selector selector;
     22 
     23     private Charset charset = Charset.forName("UTF-8");
     24 
     25     private void start() {
     26         try  {
     27             client=SocketChannel.open();
     28             selector= Selector.open();
     29             client.configureBlocking(false);
     30             //注册channel,并监听SocketChannel的Connect事件
     31             client.register(selector, SelectionKey.OP_CONNECT);
     32             //请求服务器建立连接
     33             client.connect(new InetSocketAddress("127.0.0.1", 8888));
     34             //和服务器一样,不停的获取触发事件,并做相应的处理
     35             while (true) {
     36                 selector.select();
     37                 Set<SelectionKey> selectionKeys = selector.selectedKeys();
     38                 for (SelectionKey key : selectionKeys) {
     39                     handle(key);
     40                 }
     41                 selectionKeys.clear();
     42             }
     43         } catch (IOException e) {
     44             e.printStackTrace();
     45         }catch (ClosedSelectorException e){
     46             //当用户输入quit时,在send()方法中,selector会被关闭,而在上面的无限while循环中,可能会使用到已经关闭了的selector。
     47             //所以这里捕捉一下异常,做正常退出处理就行了。不会对服务器造成影响
     48         }
     49     }
     50 
     51     private void handle(SelectionKey key) throws IOException {
     52         //当触发connect事件,也就是服务器和客户端建立连接
     53         if (key.isConnectable()) {
     54             SocketChannel client = (SocketChannel) key.channel();
     55             //finishConnect()返回true,说明和服务器已经建立连接。如果是false,说明还在连接中,还没完全连接完成
     56             if(client.finishConnect()){
     57                 //新建一个新线程去等待用户输入
     58                 new Thread(new UserInputHandler(this)).start();
     59             }
     60             //连接建立完成后,注册read事件,开始监听服务器转发的消息
     61             client.register(selector,SelectionKey.OP_READ);
     62         }
     63         //当触发read事件,也就是获取到服务器的转发消息
     64         if(key.isReadable()){
     65             SocketChannel client = (SocketChannel) key.channel();
     66             //获取消息
     67             String msg = receive(client);
     68             System.out.println(msg);
     69             //判断用户是否退出
     70             if (msg.equals("quit")) {
     71                 //解除该事件的监听
     72                 key.cancel();
     73                 //更新Selector
     74                 selector.wakeup();
     75             }
     76         }
     77     }
     78 
     79     //获取消息
     80     private String receive(SocketChannel client) throws IOException {
     81         read_buffer.clear();
     82         while (client.read(read_buffer)>0);
     83         read_buffer.flip();
     84         return String.valueOf(charset.decode(read_buffer));
     85     }
     86 
     87     //发送消息
     88     public void send(String msg) throws IOException{
     89         if(!msg.isEmpty()){
     90             write_buffer.clear();
     91             write_buffer.put(charset.encode(msg));
     92             write_buffer.flip();
     93             while (write_buffer.hasRemaining()){
     94                 client.write(write_buffer);
     95             }
     96             if(msg.equals("quit")){
     97                 selector.close();
     98             }
     99         }
    100     }
    101 
    102     public static void main(String[] args) {
    103         new ChatClient().start();
    104     }
    105 }
    View Code

    客户端UserInputHandler:

     1 /**
     2  * 1. 功能描述:监听用户输入信息的线程。
     3  */
     4 
     5 import java.io.BufferedReader;
     6 import java.io.IOException;
     7 import java.io.InputStreamReader;
     8 
     9 public class UserInputHandler implements Runnable {
    10     ChatClient client;
    11     public UserInputHandler(ChatClient chatClient) {
    12         this.client=chatClient;
    13     }
    14     @Override
    15     public void run() {
    16         BufferedReader read=new BufferedReader(
    17                 new InputStreamReader(System.in)
    18         );
    19         while (true){
    20             try {
    21                 String input=read.readLine();
    22                 client.send(input);
    23                 if(input.equals("quit"))
    24                     break;
    25             } catch (IOException e) {
    26                 e.printStackTrace();
    27             }
    28         }
    29     }
    30 }
    View Code

     

    AIO编程模型

    AIO中的异步操作

    CompletionHandler

    在AIO编程模型中,常用的API,如connect、accept、read、write都是支持异步操作的。当调用这些方法时,可以携带一个CompletionHandler参数,它会提供一些回调函数。这些回调函数包括:

    1.当这些操作成功时你需要怎么做;

    2.如果这些操作失败了你要这么做。关于这个CompletionHandler参数,你只需要写一个类实现CompletionHandler口,并实现里面两个方法就行了。

    那如何在调用connect、accept、read、write这四个方法时,传入CompletionHandler参数从而实现异步呢?下面分别举例这四个方法的使用。

    先说说Socket和ServerSocket,在NIO中,它们变成了通道,配合缓冲区,从而实现了非阻塞。而在AIO中它们变成了异步通道。也就是AsynchronousServerSocketChannel和AsynchronousSocketChannel,下面例子中对象名分别是serverSocket和socket.

    accept:serverSocket.accept(attachment,handler)。handler就是实现了CompletionHandler接口并实现两个回调函数的类,它具体怎么写可以看下面的实战代码。attachment为handler里面可能需要用到的辅助数据,如果没有就填null。

    read:socket.read(buffer,attachment,handler)。buffer是缓冲区,用以存放读取到的信息。后面两个参数和accept一样。

    write:socket.write(buffer,attachment,handler)。和read参数一样。

    connect:socket.connect(address,attachment,handler)。address为服务器的IP和端口,后面两个参数与前几个一样。

    Future

    既然说到了异步操作,除了使用实现CompletionHandler接口的方式,不得不想到Future。客户端逻辑较为简单,如果使用CompletionHandler的话代码反而更复杂,所以下面的实战客户端代码就会使用Future的方式。简单来说,Future表示的是异步操作未来的结果,怎么理解未来。比如,客户端调用read方法获取服务器发来得消息:

    Future<Integer> readResult=clientChannel.read(buffer)

    Integer是read()的返回类型,此时变量readResult实际上并不一定有数据,而是表示read()方法未来的结果,这时候readResult有两个方法,isDone():返回boolean,查看程序是否完成处理,如果返回true,有结果了,这时候可以通过get()获取结果。如果你不事先判断isDone()直接调用get()也行,只不过它是阻塞的。如果你不想阻塞,想在这期间做点什么,就用isDone()。

    还有一个问题:这些handler的方法是在哪个线程执行的?serverSocket.accept这个方法肯定是在主线程里面调用的,而传入的这些回调方法其实是在其他线程执行的。在AIO中,会有一个AsynchronousChannelGroup,它和AsynchronousServerSocketChannel是绑定在一起的,它会为这些异步通道提供系统资源,线程就算其中一种系统资源,所以为了方便理解,我们暂时可以把他看作一个线程池,它会为这些handler分配线程,而不是在主线程中去执行。

    AIO编程模型

    上面只说了些零碎的概念,为了更好的理解,下面讲一讲大概的工作流程(主要针对服务器,客户端逻辑较为简单):

    1.首先做准备工作。跟NIO一样,先要创建好通道,只不过AIO是异步通道。然后创建好AsyncChannelGroup,可以选择自定义线程池。最后把AsyncServerSocket和AsyncChannelGroup绑定在一起,这样处于同一个AsyncChannelGroup里的通道就可以共享系统资源。

    2.最后一步准备工作,创建好handler类,并实现接口和里面两个回调方法。(如图:客户端1对应的handler,里面的回调方法会实现读取消息和转发消息的功能;serverSocket的handler里的回调方法会实现accept功能。)

    3.准备工作完成,当客户端1连接请求进来,客户端会马上回去,ServerSocket的异步方法会在连接成功后把客户端的SocketChannel存进在线用户列表,并利用客户端1的handler开始异步监听客户端1发送的消息。

    4.当客户端1发送消息时,如果上一步中的handler成功监听到,就会回调成功后的回调方法,这个方法里会把这个消息转发给其他客户端。转发完成后,接着利用handler监听客户端1发送的消息。

    epoll原理

    select,poll,epoll都是IO多路复用的机制。I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。

    epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。

    在 select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一 个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait() 时便得到通知。(此处去掉了遍历文件描述符,而是通过监听回调的的机制。这正是epoll的魅力所在。) 如果没有大量的idle -connection或者dead-connection,epoll的效率并不会比select/poll高很多,但是当遇到大量的idle- connection,就会发现epoll的效率大大高于select/poll。

    注意:linux下Selector底层是通过epoll来实现的,当创建好epoll句柄后,它就会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

    先看看使用c封装的3个epoll系统调用:

    • int epoll_create(int size) epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。

    • *int epoll_ctl(int epfd, int op, int fd, struct epoll_event event) epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。

    • *int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout) epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。

      大概看看epoll内部是怎么实现的:

    1. epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。

    2. 当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。

    3. 当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。

      epoll的两种工作模式:

    • LT:level-trigger,水平触发模式,只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket。
    • ET:edge-trigger,边缘触发模式,只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket。

    socket读数据

    socket写数据

    最后顺便说下在Linux系统中JDK NIO使用的是 LT ,而Netty epoll使用的是 ET。

    参考

    https://www.cnblogs.com/lbhym/p/12698309.html

    https://github.com/h2pl/Java-Tutorial/tree/master/docs/java/network-programming

    https://www.zhihu.com/question/21383903/answer/64103663

    https://blog.csdn.net/czx2018/article/details/89502699

    https://www.cnblogs.com/snailclimb/p/9086334.html

  • 相关阅读:
    .net开源工作流ccflow从表数据数据源导入设置
    驰骋开源的asp.net工作流程引擎java工作流 2015 正文 驰骋工作流引擎ccflow6的功能列表
    app:clean classes Exception
    Android Couldn't load BaiduMapSDK
    android okvolley框架搭建
    compileDebugJavaWithJavac
    android重复的文件复制APK META-INF许可证错误记录
    android listview多视图嵌套多视图
    通讯录笔记
    面试总结
  • 原文地址:https://www.cnblogs.com/RQfreefly/p/13544850.html
Copyright © 2011-2022 走看看