zoukankan      html  css  js  c++  java
  • Java NIO 非阻塞Socket服务器构建

    推荐阅读IBM developerWorks中NIO的入门教程,尤其是对块I/O和流I/O不太清楚的开发者。

    说到socket服务器,第一反应是java.net.Socket这个类。事实上在并发和响应时间要求不高的场合,是可以用java.net.Socket来实现的,比如写一个局域网聊天工具、发送文件等。但它的缺点也很明显,需要自行对接受的线程进行维护,管理缓冲区的分配等,我尝试过用java.net.Socket完成一个瞬时负载在千人左右的服务器,却因后期改动和维护异常麻烦而放弃。

    Java自1.4以后,加入了新IO特性,这便是本文要介绍的NIO。下面是一段服务器的示例代码(引用自xpbug的Blog):

      1 public class EchoServer {
      2     public static SelectorLoop connectionBell;
      3     public static SelectorLoop readBell;
      4     public boolean isReadBellRunning=false;
      5  
      6     public static void main(String[] args) throws IOException {
      7         new EchoServer().startServer();
      8     }
      9      
     10     // 启动服务器
     11     public void startServer() throws IOException {
     12         // 准备好一个闹钟.当有链接进来的时候响.
     13         connectionBell = new SelectorLoop();
     14          
     15         // 准备好一个闹装,当有read事件进来的时候响.
     16         readBell = new SelectorLoop();
     17          
     18         // 开启一个server channel来监听
     19         ServerSocketChannel ssc = ServerSocketChannel.open();
     20         // 开启非阻塞模式
     21         ssc.configureBlocking(false);
     22          
     23         ServerSocket socket = ssc.socket();
     24         socket.bind(new InetSocketAddress("localhost",7878));
     25          
     26         // 给闹钟规定好要监听报告的事件,这个闹钟只监听新连接事件.
     27         ssc.register(connectionBell.getSelector(), SelectionKey.OP_ACCEPT);
     28         new Thread(connectionBell).start();
     29     }
     30      
     31     // Selector轮询线程类
     32     public class SelectorLoop implements Runnable {
     33         private Selector selector;
     34         private ByteBuffer temp = ByteBuffer.allocate(1024);
     35          
     36         public SelectorLoop() throws IOException {
     37             this.selector = Selector.open();
     38         }
     39          
     40         public Selector getSelector() {
     41             return this.selector;
     42         }
     43  
     44         @Override
     45         public void run() {
     46             while(true) {
     47                 try {
     48                     // 阻塞,只有当至少一个注册的事件发生的时候才会继续.
     49                     this.selector.select();
     50                      
     51                     Set<SelectionKey> selectKeys = this.selector.selectedKeys();
     52                     Iterator<SelectionKey> it = selectKeys.iterator();
     53                     while (it.hasNext()) {
     54                         SelectionKey key = it.next();
     55                         it.remove();
     56                         // 处理事件. 可以用多线程来处理.
     57                         this.dispatch(key);
     58                     }
     59                 } catch (IOException e) {
     60                     e.printStackTrace();
     61                 } catch (InterruptedException e) {
     62                     e.printStackTrace();
     63                 }
     64             }
     65         }
     66          
     67         public void dispatch(SelectionKey key) throws IOException, InterruptedException {
     68             if (key.isAcceptable()) {
     69                 // 这是一个connection accept事件, 并且这个事件是注册在serversocketchannel上的.
     70                 ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
     71                 // 接受一个连接.
     72                 SocketChannel sc = ssc.accept();
     73                  
     74                 // 对新的连接的channel注册read事件. 使用readBell闹钟.
     75                 sc.configureBlocking(false);
     76                 sc.register(readBell.getSelector(), SelectionKey.OP_READ);
     77                  
     78                 // 如果读取线程还没有启动,那就启动一个读取线程.
     79                 synchronized(EchoServer.this) {
     80                     if (!EchoServer.this.isReadBellRunning) {
     81                         EchoServer.this.isReadBellRunning = true;
     82                         new Thread(readBell).start();
     83                     }
     84                 }
     85                  
     86             } else if (key.isReadable()) {
     87                 // 这是一个read事件,并且这个事件是注册在socketchannel上的.
     88                 SocketChannel sc = (SocketChannel) key.channel();
     89                 // 写数据到buffer
     90                 int count = sc.read(temp);
     91                 if (count < 0) {
     92                     // 客户端已经断开连接.
     93                     key.cancel();
     94                     sc.close();
     95                     return;
     96                 }
     97                 // 切换buffer到读状态,内部指针归位.
     98                 temp.flip();
     99                 String msg = Charset.forName("UTF-8").decode(temp).toString();
    100                 System.out.println("Server received ["+msg+"] from client address:" + sc.getRemoteAddress());
    101                  
    102                 Thread.sleep(1000);
    103                 // echo back.
    104                 sc.write(ByteBuffer.wrap(msg.getBytes(Charset.forName("UTF-8"))));
    105                  
    106                 // 清空buffer
    107                 temp.clear();
    108             }
    109         }
    111     }
    112  
    113 }

     此外,还有一个看上去更“规范”的示例《ServerSocketChannel与SocketChannel的使用》,这里就不再引用了。

    OK,原文的注释已经很详细了,这里进一步解析这段代码。

    首先是java.nio.channels.ServerSocketChannel这个类,引用官方的描述:

    public abstract class ServerSocketChannelextends AbstractSelectableChannel

    A selectable channel for stream-oriented listening sockets.

    Server-socket channels are not a complete abstraction of listening network sockets. Binding and the manipulation of socket options must be done through an associated ServerSocket object obtained by invoking the socket method. It is not possible to create a channel for an arbitrary, pre-existing server socket, nor is it possible to specify the SocketImpl object to be used by a server socket associated with a server-socket channel.

    A server-socket channel is created by invoking the open method of this class. A newly-created server-socket channel is open but not yet bound. An attempt to invoke the accept method of an unbound server-socket channel will cause a NotYetBoundException to be thrown. A server-socket channel can be bound by invoking one of the bind methods of an associated server socket.

    Server-socket channels are safe for use by multiple concurrent threads.

    再看选择器Selector的用法:请戳这里

  • 相关阅读:
    上位机获取Mjpeg视频流程序(C#.NET语言+AForge.NET控件)(待测试)
    C# Ping类的使用
    四旋翼上位机模拟显示四轴状态
    利用Sniffer分析 ARP报文
    C# 基于CSGL opengl
    用C#实现对本机IP地址的设置
    使用grub4dos引导Linux
    几个常用的批处理(DOS指令)的应用
    四旋翼飞行器之OSD(基本完成)
    Posix多线程编程学习笔记(转)
  • 原文地址:https://www.cnblogs.com/gugia/p/4462623.html
Copyright © 2011-2022 走看看