• Undertow服务器基础分析


    我们从名字上就能看出这是一个NIO思想为基础的IO框架,X是指这个框架可以有多种实现,我们可以从代码库 https://github.com/xnio 中发现一个项目xnio-native,里面有用C实现的nio层,就能体会到这个X的含义,可以直接基于操作系统C库。目前在Xnio中默认的实现是nio-impl,也就是JDK的NIO。我们可以认为xnio是在JDK的NIO之上,进行了扩展,融入了一些JBoss开发人员对于并发访问异步通信的理解和思考,总结出一套API,并用基于JDK NIO进行实现。

    要学习XNIO,必须得有JDK NIO的基础知识,本文假设读者已经学习过NIO,如果还没有可以阅读参考书籍[1],[2]。另外对Netty有所了解的话,就会融会贯通,比较容易的理解XNIO的基本原理,因为两个项目有很多相似之处。

    XNIO有两个重要的概念:
    1. Channel,是传输管道的抽象概念,在NIO的Channel上进行的扩展加强,使用ChannelListener API进行事件通知。在创建Channel时,就赋予IO线程,用于执行所有的ChannelListener回调方法。
    2. 区分IO线程和工作线程,创建一个工作线程池可以用来执行阻塞任务。一般情况下,非阻塞的Handler由IO线程执行,而阻塞任务比如Servlet则被调度到工作线程池执行。这样就很好的区分了阻塞和非阻塞的两种情形。

    我们知道NIO的基本要求是不阻塞当前线程的执行,对于非阻塞请求的结果,可以用两种方式获得:一种是对于请求很快返回一个引用(如JDK中Future,XNIO中称为IoFuture,其中很多方法是类似的),过一段时间再查询结果;还有一种是当结果就绪时,调用事先注册的回调方法来通知(如NIO2的CompletionHandler,XNIO的ChannelListener)。显而易见后者效率更高一些,避免了数据未就绪情景下的无用处理过程。但JDK7之前无法将函数作为方法参数,所以只能用Java的匿名内部类来模拟函数式方法,造成代码嵌套层次过多,难以理解和维护,所以Netty和XNIO这样的框架通过调度方法调用过程,简化了编程工作。

    XNIO和Netty的最主要的一个区别是,XNIO继承重用了JDK NIO的ByteBuffer类,而不像Netty另起炉灶,完全重建自己的ByteBuf体系。我们知道NIO的ByteBuffer使用时有个状态切换的过程,读和写要显式的通过调用slice, reset等方法切换,就和unix使用vi编辑器编辑和处理文本需要用'i'和Esc切换状态类似。Netty通过读写指针索引值移除了这个“不便操作”,但XNIO保留了和JDK NIO一致的做法。

    无论NIO还是Netty,都有heap buffer和direct buffer的概念,前者可以认为是byte数组的封装,缓冲区存放在堆上,后者可以直接通过调用操作系统的系统调用在内存上分配缓冲,这样在一些IO操作时,比如从网卡上读出大量数据,再写到硬盘文件中,就不必拷贝数据到应用层,直接在操作系统内核或者驱动上进行数据复制。ByteBuffer的管理和应用是NIO最核心的思想,开发人员应该根据应用类型,对其反复调优,做到占用资源最少和效率最大。

    XNIO和Netty都对ByteBuffer进行池化管理,简单来说就是开发者在程序开始时就计划好读写缓存区大小,统一分配好放到池中,Xnio中有Pool和Pooled接口用来管理池化缓存区。开发过高并发应用就知道,JVM GC经常出现并难以控制是很头疼的问题。我们通常在接收网络数据时,往往简单的new出一块数据区,填充,解析,使用,最后丢弃,这种方法随着大量的数据读入,必然造成GC反复出现。重用缓存区就可以在这个方面解决一部分问题。

    和Netty的ChannelHandler不同,XNIO对应的ChannelListener只有一个方法handleEvent(),也就意味着所有的事件都要经由这个方法。在实际实行过程中,会进行若干状态机的转变,比如在服务器端,开始时accept状态就绪,当连接建立后转变为可读或者可写状态。请参见下面的例子。

    在一些情况下,阻塞的IO调用也是很有用的,比如事务过程中。XNIO也提供了阻塞方法awaitReadable()和awaitWritable()。

    利用Stream channel,可以在数据源头和目的地之间直接读写数据,有一种zero-copy(零拷贝)的方式,即读写过程使用同一块缓存区,这样就不必进行数据拷贝移动过程。XNIO通过封装NIO FileChannel中的方法transferTo和transferFrom来实现。

    还有一种Messgae channel,用来传输帧数据,因为有些报文格式是固定长度或者按照某种已知帧式格式定义的,缓冲区的长度可以按照报文帧来定义,当数据填满后就及时发送,减少了数据长度的计算工作,代码逻辑简洁很多,所以这种channel用于传递'消息',websocket就是这样的。

    阅读XNIO时,有几个词出现频率很高,Source表示信息源头,Sink是信息目的地,Conduit是源头到目的地管道的抽象。

    前面提过,XNIO中有两类线程:
    WORKER_IO_THREADS, IO thread处理非阻塞任务,要保证不做阻塞操作,因为很多连接同时用到这类线程,类似于nodejs中的loop,这个线程只要有任务就去执行,实际配置时每个CPU一个线程比较好。
    WORKER_TASK_CORE_THREADS,用于执行阻塞任务,从线程池中获得,任务完成后返回到线程池中。因为不同应用对应的服务器负载不同,所以不易给出具体数值,一般建议每个CPU core设置10个。

    代码分析,摘自
    https://github.com/ecki/xnio-samples/blob/master/src/main/java/org/xnio/samples/SimpleEchoServer.java

    服务器:

    Java代码  收藏代码
    1. import java.io.IOException;  
    2. import java.net.InetSocketAddress;  
    3. import java.nio.ByteBuffer;  
    4.   
    5. import org.xnio.ChannelListener;  
    6. import org.xnio.IoUtils;  
    7. import org.xnio.OptionMap;  
    8. import org.xnio.Xnio;  
    9. import org.xnio.XnioWorker;  
    10. import org.xnio.channels.AcceptingChannel;  
    11. import org.xnio.channels.Channels;  
    12. import org.xnio.channels.ConnectedStreamChannel;  
    13.   
    14. public final class SimpleEchoServer {  
    15.   
    16.     public static void main(String[] args) throws Exception {  
    17.   
    18.         // 定义读数据listener  
    19.         final ChannelListener<ConnectedStreamChannel> readListener =   
    20.             new ChannelListener<ConnectedStreamChannel>() {  
    21.             public void handleEvent(ConnectedStreamChannel channel) {  
    22.                 //分配缓冲  
    23.                 final ByteBuffer buffer = ByteBuffer.allocate(512);  
    24.                 int res;  
    25.                 try {  
    26.                     while ((res = channel.read(buffer)) > 0) {  
    27.                         //切换到写的状态并用阻塞的方式写回  
    28.                         buffer.flip();  
    29.                         Channels.writeBlocking(channel, buffer);  
    30.                     }  
    31.                     // 保证全部送出  
    32.                     Channels.flushBlocking(channel);  
    33.                     if (res == -1) {  
    34.                         channel.close();  
    35.                     } else {  
    36.                         channel.resumeReads();  
    37.                     }  
    38.                 } catch (IOException e) {  
    39.                     e.printStackTrace();  
    40.                     IoUtils.safeClose(channel);  
    41.                 }  
    42.             }  
    43.         };  
    44.         // 创建接收 listener.  
    45.         final ChannelListener<AcceptingChannel<ConnectedStreamChannel>> acceptListener =   
    46.             new ChannelListener<AcceptingChannel<ConnectedStreamChannel>>() {  
    47.             public void handleEvent(  
    48.                     final AcceptingChannel<ConnectedStreamChannel> channel) {  
    49.                 try {  
    50.                     ConnectedStreamChannel accepted;  
    51.                     // channel就绪,准备接收连接请求  
    52.                     while ((accepted = channel.accept()) != null) {  
    53.                         System.out.println("accepted " + accepted.getPeerAddress());  
    54.                         // 已经连接,设置读数据listener  
    55.                         accepted.getReadSetter().set(readListener);  
    56.                         // 恢复读的状态  
    57.                         accepted.resumeReads();  
    58.                     }  
    59.                 } catch (IOException ignored) {  
    60.                 }  
    61.             }  
    62.         };  
    63.   
    64.         //创建Xnio实例,并构造XnioWorker  
    65.         final XnioWorker worker = Xnio.getInstance().createWorker(OptionMap.EMPTY);  
    66.         // 创建server,在本地12345端口上侦听  
    67.         AcceptingChannel<? extends ConnectedStreamChannel> server = worker  
    68.                 .createStreamServer(new InetSocketAddress(12345),  
    69.                         acceptListener, OptionMap.EMPTY);  
    70.         // 开始接受连接  
    71.         server.resumeAccepts();  
    72.         System.out.println("Listening on " + server.getLocalAddress());  
    73.     }  
    74. }  

     
    客户端:

    Java代码  收藏代码
    1. import java.net.InetSocketAddress;  
    2. import java.nio.ByteBuffer;  
    3. import java.nio.CharBuffer;  
    4. import java.nio.charset.Charset;  
    5.   
    6. import org.xnio.IoFuture;  
    7. import org.xnio.IoUtils;  
    8. import org.xnio.OptionMap;  
    9. import org.xnio.Xnio;  
    10. import org.xnio.XnioWorker;  
    11. import org.xnio.channels.Channels;  
    12. import org.xnio.channels.ConnectedStreamChannel;  
    13.   
    14. public final class SimpleHelloWorldBlockingClient {  
    15.   
    16.     public static void main(String[] args) throws Exception {  
    17.         final Charset charset = Charset.forName("utf-8");  
    18.         //创建Xnio实例,并构造XnioWorker  
    19.         final Xnio xnio = Xnio.getInstance();  
    20.         final XnioWorker worker = xnio.createWorker(OptionMap.EMPTY);  
    21.   
    22.         try {  
    23.             //连接服务器,本地12345端口,注意返回值是IoFuture类型,并不阻塞,返回后可以做些别的事情  
    24.             final IoFuture<ConnectedStreamChannel> futureConnection = worker.connectStream(  
    25.                 new InetSocketAddress("localhost"12345), null, OptionMap.EMPTY);  
    26.             final ConnectedStreamChannel channel = futureConnection.get(); // get是阻塞调用  
    27.             try {  
    28.                 // 发送消息  
    29.                 Channels.writeBlocking(channel, ByteBuffer.wrap("Hello world! ".getBytes(charset)));  
    30.                 // 保证全部送出  
    31.                 Channels.flushBlocking(channel);  
    32.                 // 发送EOF  
    33.                 channel.shutdownWrites();  
    34.                 System.out.println("Sent greeting string! The response is...");  
    35.                 ByteBuffer recvBuf = ByteBuffer.allocate(128);  
    36.                 // 接收消息  
    37.                 while (Channels.readBlocking(channel, recvBuf) != -1) {  
    38.                     recvBuf.flip();  
    39.                     final CharBuffer chars = charset.decode(recvBuf);  
    40.                     System.out.print(chars);  
    41.                     recvBuf.clear();  
    42.                 }  
    43.             } finally {  
    44.                 IoUtils.safeClose(channel);  
    45.             }  
    46.         } finally {  
    47.             worker.shutdown();  
    48.         }  
    49.     }  
    50. }  

     
    [1]: JavaNIO http://www.amazon.com/Java-Nio-Ron-Hitchens/dp/0596002882
    [2]: Pro Java 7 NIO.2 http://www.amazon.com/Pro-Java-NIO-2-Anghel-Leonard/dp/1430240113

  • 相关阅读:
    Windows10关机问题----只有“睡眠”、“更新并重启”、“更新并关机”,但是又不想更新,解决办法
    3ds max学习笔记(九)-- 实例操作(路径阵列)
    3ds max学习笔记(八)-- 实例操作(直行楼梯)
    3ds max学习笔记(七)-- 实例操作(桌子)
    3ds max学习笔记(六)-- 基本操作(建模前奏)
    UE4入门(二)建立和打开项目
    3ds max学习笔记(五)--操作工具
    3ds max 学习笔记(四)--创建物体
    3ds max学习笔记(一)--选择物体
    欧拉回路输出(DFS,不用回溯!)Watchcow POJ 2230
  • 原文地址:https://www.cnblogs.com/jpfss/p/11320744.html
走看看 - 开发者的网上家园