一、概念
在传统的java网络编程中,都是在服务端创建一个ServerSocket,然后为每一个客户端单独创建一个线程Thread分别处理各自的请求,由于对于CPU而言,线程的开销是很大的,无限创建线程会让操作系统崩溃,因此,比较好的方法是在系统启动的时候创建一个动态的线程池,例如鼎鼎大名的服务器Tomcat,就是采用这种解决方案,然而,这种解决方案在高并发的情况下,情况就不太乐观了,当线程池大小超过CPU瓶颈的时候,相应速度,就极其低下了。
传统的java网络编程的结构图如下
NIO采用了通道Channel和选择器Selector的核心对象,Select 机制,不用为每一个客户端连接新启线程处理,而是将其注册到特定的Selector 对象上,这就可以在单线程中利用Selector 对象管理大量并发的网络连接,更好的利用了系统资源;采用非阻塞I/O 的通信方式,不要求阻塞等待I/O 操作完成即可返回,从而减少了管理I/O 连接导致的系统开销,大幅度提高了系统性能。
当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的SelectionKey , 从SelectionKey 中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。由于在非阻塞网络I/O 中采用了事件触发机制,处理程序可以得到系统的主动通知,从而可以实现底层网络I/O 无阻塞、流畅地读写,而不像在原来的阻塞模式下处理程序需要不断循环等待。使用NIO,可以编写出性能更好、更易扩展的并发型服务器程序。
NIO的结构如下
由此可见,服务端最少只需要一个线程,既可以处理所有客户端Socket
NIO的设计原理
设计原理有点像设计模式中的观察者模式,由Selector去轮流咨询各个SocketChannel通道是否有事件发生,如果有,则选择出所有的Key集合,然后传递给处理程序。我们通过每个key就可以获取客户端的SocketChannel,从而进行通信。
如果Selector发现所有通道都没有事件发生,则线程进入睡眠状态Sleep,阻塞。等到客户端有事件发生,会自动唤醒wakeup选择器selector,是不是有点类似观察者模式!!!!
下面以两个例子来说明,工程目录如下:
虽然是两个例子,但是代码都放在了一个工程里面,下面将分开介绍
二、例子1
1、DataPacket类
该类是服务端和客户端传输的数据包
package com.nio; import java.io.Serializable; import java.util.Date; /** * 数据包 * @author Administrator * */ public class DataPacket implements Serializable{ private long id; private String content; private Date sendTime; public long getId() { return id; } public void setId(long id) { this.id = id; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } public Date getSendTime() { return sendTime; } public void setSendTime(Date sendTime) { this.sendTime = sendTime; } }
2、服务端
NIOServer,服务端,接收客户端发送过来的数据,并将接受到的数据再发送到客户端
package com.nio; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private ServerSocket serverSocket; private static int PORT; private static int BUFFER_SIZE; private ByteBuffer buf; /** * 服务器构造 * @param port * @param buffersize */ public NIOServer(int port,int buffersize){ this.PORT=port; this.BUFFER_SIZE=buffersize; buf=ByteBuffer.allocate(BUFFER_SIZE); } /** * 启动监听服务 * @throws Exception */ public void startListen() throws Exception{ //打开选择器 selector=Selector.open(); //打开服务通道 serverSocketChannel=ServerSocketChannel.open(); //将服务通道设置为非阻塞 serverSocketChannel.configureBlocking(false); //创建服务端Socket serverSocket=serverSocketChannel.socket(); //服务端socket绑定端口 serverSocket.bind(new InetSocketAddress(PORT)); //服务端通道注册链接事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("端口注册完毕"); Iterator<SelectionKey> iterator=null; SelectionKey selectionKey=null; while(true){ //选择一批选择键(线程在此阻塞) selector.select(); iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ //selectionKey里包含了客户端发送过来的信息 selectionKey=iterator.next(); this.handleKey(selectionKey); iterator.remove(); } } } /** * 处理选择的键 * @param selectionKey * @throws Exception */ @SuppressWarnings("unused") private void handleKey(SelectionKey selectionKey)throws Exception{ //如果是链接事件 if(selectionKey.isAcceptable()){ //链接客户端通道(非阻塞) SocketChannel socketChannel=this.serverSocketChannel.accept(); //设置客户端通道(非阻塞) socketChannel.configureBlocking(false); //注册读事件 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println("有新链接"); } //如果是读信息事件 else if(selectionKey.isReadable()){ //获取客户端socket通道 SocketChannel socketChannel=(SocketChannel)selectionKey.channel(); //清空缓冲区 buf.clear(); //读取数据到缓冲区,并返回读取的字节数 int a=socketChannel.read(buf); if(a>0){ //将开始指针指向0;把结束指针指向实际有效位置 buf.flip(); //得到的b数据组大小 byte[] b=new byte[buf.limit()]; //取的时实际有效的数据 buf.get(b,buf.position(),buf.limit()); //ObjectInputStream 不能直接接受byte数组,所以先转换成ByteArrayInputStream ByteArrayInputStream byteIn=new ByteArrayInputStream(b); ObjectInputStream objIn=new ObjectInputStream(byteIn); DataPacket dataPacket=(DataPacket) objIn.readObject(); objIn.close(); byteIn.close(); System.out.println("从客户端发送到服务端:"+dataPacket.getContent()); System.out.println("接收时间:"+dataPacket.getSendTime().toLocaleString()); buf.flip(); //将发过来的数据再发送到客户端 socketChannel.write(buf); } else{ //关闭客户端socket通道 socketChannel.close(); } } } }
3、客户端
NIOClient,客户端输入提示字符按回车,只要不是null都将信息发送到服务端,并监听客户端传过来的数据
package com.nio; import java.io.BufferedReader; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStreamReader; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.util.Date; /** * * @author Administrator * */ public class NIOClient { public static void main(String[] args){ try { SocketAddress address=new InetSocketAddress("127.0.0.1",9999); //客户端通道打开的时候要指向一个地址和端口 SocketChannel clientChannel=SocketChannel.open(address); clientChannel.configureBlocking(false); ByteBuffer buf=ByteBuffer.allocate(1024); while(true){ buf.clear(); System.out.println("请输入发送数据包:"); //把输入的字节流转换成字符串 String msg=new BufferedReader(new InputStreamReader(System.in)).readLine(); if(msg.equals("null")){ break; } DataPacket dataPacket=new DataPacket(); dataPacket.setContent("I am hzb"); dataPacket.setSendTime(new Date()); dataPacket.setId(1); ByteArrayOutputStream baos=new ByteArrayOutputStream(); ObjectOutputStream oos=new ObjectOutputStream(baos); //把对象写入了oos流里面,但是没有到缓冲 oos.writeObject(dataPacket); //把流的数据写入到缓冲区 buf.put(baos.toByteArray()); buf.flip(); //把缓冲区里面的数据写到通道里面 clientChannel.write(buf); System.out.println("客户端发送数据:"+msg); while(true){ int len=clientChannel.read(buf); if(len>0){ buf.flip(); byte[] b=new byte[buf.limit()]; buf.get(b,buf.position(),buf.limit()); //注意:如果想要把服务端传过来的数据还原成对像,需要用 //ByteArrayInputStream byteIn=new ByteArrayInputStream(b); //ObjectInputStream objIn=new ObjectInputStream(byteIn); //DataPacket dataPacket=(DataPacket) objIn.readObject(); System.out.println("服务端传来数据:"+new String(b,"utf-8")); break; } } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
三、例子2
功能是,客户端将F:/work/nioSample/fileTest/client/client_send.txt发送给服务端,服务端接收到后存成F:/work/nioSample/fileTest/server/server_receive.txt
然后,服务端将F:/work/nioSample/fileTest/server/server_send.txt发送给客户端,客户端接收到后存成F:/work/nioSample/fileTest/client/client_receive.txt
1、服务端
package com.nio; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.logging.Level; import java.util.logging.Logger; public class NIOFileServer { private final static Logger logger = Logger.getLogger(NIOFileServer.class.getName()); /** * 主方法 * @param args */ public static void main(String[] args){ Selector selector=null; ServerSocketChannel serverSocketChannel=null; try { selector=Selector.open(); serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(new InetSocketAddress(10000)); //注册链接事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); while(selector.select()>0){ Iterator<SelectionKey> iterator=selector.selectedKeys().iterator(); while(iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); doiterator((ServerSocketChannel) key.channel()); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); logger.log(Level.SEVERE, e.getMessage(),e); }finally{ try { selector.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { serverSocketChannel.close(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } /** * * @param serverSocketChannel,定义成final保证在方法内部serverSocketChannel的内容不会被改变 */ private static void doiterator(final ServerSocketChannel serverSocketChannel){ SocketChannel socketChannel=null; try { socketChannel=serverSocketChannel.accept(); receiveFile(socketChannel, new File("F:/work/nioSample/fileTest/server/server_receive.txt")); sendFile(socketChannel, new File("F:/work/nioSample/fileTest/server/server_send.txt")); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } /** * 接收文件 * @param socketChannel * @param file * @throws IOException */ private static void receiveFile(SocketChannel socketChannel,File file) throws IOException{ FileOutputStream fos=null; FileChannel fileChannel=null; try{ //保存文件要保存的路径 fos=new FileOutputStream(file); fileChannel=fos.getChannel(); ByteBuffer buffer=ByteBuffer.allocateDirect(1024); int len=0; //把客户端通道socketChannel的文件读到缓冲区,再从缓冲区写到本地文件通道channel的路径下 while((len=socketChannel.read(buffer))!=-1){ buffer.flip(); if(len>0){ buffer.limit(len); fileChannel.write(buffer); buffer.clear(); } } }catch(Exception ex){ ex.printStackTrace(); }finally{ fos.close(); fileChannel.close(); } } /** * 发送文件 * @param socketChannel * @param file * @throws IOException */ private static void sendFile(SocketChannel socketChannel,File file) throws IOException{ FileInputStream fis=null; FileChannel fileChannel=null; try{ fis=new FileInputStream(file); fileChannel=fis.getChannel(); ByteBuffer buffer=ByteBuffer.allocateDirect(1024); int len=0; while((len=fileChannel.read(buffer))!=-1){ //将buffer的游标position指向0 buffer.rewind(); buffer.limit(len); socketChannel.write(buffer); buffer.clear(); } //防止正在发送的过程中又发送一个文件 socketChannel.socket().shutdownOutput(); }catch(Exception ex){ ex.printStackTrace(); }finally{ fis.close(); fileChannel.close(); } } }
2、客户端
package com.nio; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; import java.util.logging.Level; import java.util.logging.Logger; public class NIOFileClient { private final static Logger logger = Logger.getLogger(NIOFileClient.class.getName()); public static void main(String[] args) throws Exception { new Thread(new MyRunnable()).start(); } private static final class MyRunnable implements Runnable { public void run() { SocketChannel socketChannel = null; try { socketChannel = SocketChannel.open(); SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 10000); socketChannel.connect(socketAddress); sendFile(socketChannel, new File("F:/work/nioSample/fileTest/client/client_send.txt")); receiveFile(socketChannel, new File("F:/work/nioSample/fileTest/client/client_receive.txt")); } catch (Exception ex) { logger.log(Level.SEVERE, null, ex); } finally { try { socketChannel.close(); } catch(Exception ex) {} } } private void sendFile(SocketChannel socketChannel, File file) throws IOException { FileInputStream fis = null; FileChannel channel = null; try { fis = new FileInputStream(file); channel = fis.getChannel(); ByteBuffer buffer = ByteBuffer.allocateDirect(1024); int size = 0; while ((size = channel.read(buffer)) != -1) { buffer.rewind(); buffer.limit(size); socketChannel.write(buffer); buffer.clear(); } socketChannel.socket().shutdownOutput(); } finally { try { channel.close(); } catch(Exception ex) {} try { fis.close(); } catch(Exception ex) {} } } private void receiveFile(SocketChannel socketChannel, File file) throws IOException { FileOutputStream fos = null; FileChannel channel = null; try { fos = new FileOutputStream(file); channel = fos.getChannel(); ByteBuffer buffer = ByteBuffer.allocateDirect(1024); int size = 0; while ((size = socketChannel.read(buffer)) != -1) { buffer.flip(); if (size > 0) { buffer.limit(size); channel.write(buffer); buffer.clear(); } } } finally { try { channel.close(); } catch(Exception ex) {} try { fos.close(); } catch(Exception ex) {} } } } }