本章主要介绍Socket的基本概念,传统的同步阻塞式I/O编程,伪异步IO实现,学习NIO的同步非阻塞编程和NIO2.0(AIO)异步非阻塞编程。
目前为止,Java共支持3种网络编程模型:BIO、NIO、AIO: Java BIO : 同步并阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。 Java NIO : 同步非阻塞,服务器实现模式为一个请求一个线程,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。 Java AIO(NIO.2) : 异步非阻塞,服务器实现模式为一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理。 BIO、NIO、AIO适用场景分析: BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。 NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。 AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。
一:基本概念
Socket又被称为 "套接字" ,应用程序通常都是通过 "套接字" 向网络发出请求和接收请求。Socket和serverSocket类位于java.net包中。ServerSocket用于(Server)服务端,Socket用于
(Client)客户端。当服务端和客户端建立连接后。两端都会产生一个Socket实例,并且是平等的。不管是Socket还是ServerSocket。都是通过操作SocketImpl和其子类完成相关功能。
连接过程四步骤: 1:服务器监听 2:客户端请求 3:服务端连接确认 4:客户端连接确认
二:传统同步阻塞IO实现
服务端ServerSocket:
1 final static int PROT = 8765; 2 3 ServerSocket server = null; 4 5 server = new ServerSocket(PROT); 6 7 Socket socket = server.accept(); //进行阻塞 8 9 new Thread(new ServerHandler(socket)).start(); //服务端运行,等待客户端连接
客户端Socket:
1 final static String ADDRESS = "127.0.0.1"; 2 3 final static int PORT = 8765; 4 5 Socket socket = null; 6 7 socket = new Socket(ADDRESS, PORT); //进行连接
服务端处理器ServerHandler:
1 // 实现Runnable 2 3 private Socket socket ; 4 5 public ServerHandler(Socket socket){ 6 this.socket = socket; 7 } 8 9 //重写run方法: 10 11 @Override 12 public void run() { 13 BufferedReader in = null; 14 PrintWriter out = null; 15 try { 16 in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); 17 out = new PrintWriter(this.socket.getOutputStream(), true); 18 String body = null; 19 while(true){ 20 body = in.readLine(); 21 if(body == null) break; 22 System.out.println("Server :" + body); 23 out.println("服务器端回送响的应数据."); 24 } 25 } catch (Exception e) { 26 e.printStackTrace(); 27 28 }
三:伪异步实现:
原理:传统的是直接new Thread()来进行运行任务,现在我们直接通过自定义线程池来实现伪异步。
1 //之前服务端运行: 2 3 //新建一个线程执行客户端的任务 4 new Thread(new ServerHandler(socket)).start();
1 // 现在伪异步: 2 3 HandlerExecutorPool executorPool = new HandlerExecutorPool(50, 1000); 4 while(true){ 5 socket = server.accept(); 6 executorPool.execute(new ServerHandler(socket)); 7 }
自定义线程池:HandlerExecutorPool
1 public class HandlerExecutorPool { 2 3 private ExecutorService executor; 4 public HandlerExecutorPool(int maxPoolSize, int queueSize){ 5 this.executor = new ThreadPoolExecutor( 6 Runtime.getRuntime().availableProcessors(), 7 maxPoolSize, 8 120L, 9 TimeUnit.SECONDS, 10 new ArrayBlockingQueue<Runnable>(queueSize)); 11 } 12 13 public void execute(Runnable task){ 14 this.executor.execute(task); 15 } 16 17 }
四:NIO(非阻塞编程)
传统IO和NIO的差异:IO是同步阻塞 NIO是同步非阻塞。 在jdk1.7以后,NIO升级(NIO2.0)AIO,实现了异步非阻塞
传统的IO(BIO)阻塞:在网络应用程序获取网络数据时,如果网络传输数据很慢,那么程序就一直等着,直到传输完毕为止。
NIO:无需等待,直接获取数据,在数据没有传输完毕时,不获取数据,数据暂时放在缓冲区,等传输完毕以后,缓冲区发出通知,客户端获取数据,实现不等待。
基本概念:
Buffer(缓冲区) channel(管道、通道) Selector(选择器,多路复用器)
Buffer注意事项:每次在put(),for循环 之后都要进行flip()复位。要复位下标
Buffer常用方法:
flip()复位:因为buffer和游标类似,每次新增数据之后,它的下标都会自增,如果用for循环遍历时,他只会遍历没有填充的下标的值,所以要用filp()方法复
位。
wrap(数组):wrap方法会包裹一个数组: 一般这种用法不会先初始化缓存对象的长度,因为没有意义,最后还会被wrap所包裹的数组覆盖掉
duplicate(): buffer复制的方法 。一个buffer数据复制给另外一个buffer数组
position(index):设置buffer可读的下标的位置
remaining() :返回buffer可读的长度
get(数组):把buffer数据复制给数组
Channel管道:双向
两大类: 1:网络读写类(SelectableChannel) 2:文件操作类(FileChannel)
我们要使用的SocketChannel和ServerSocketChannel就在SelectableChannel类里面
Selector:选择器(多路复用器)
原理:Selector不断的注册轮询注册在其上的通道(SocketChannel),如果某一个通道发生了读写操作,这个通道就处于就绪状态,会被Selector轮询出
来。然后通过SelectionKey就可以获取到就绪的Channel集合,从而进行后续操作。
四大状态:连接状态 阻塞状态 可读状态 可写状态
下面来看一下程序中是怎么通过这些类库实现Socket功能。
首先介绍一下几个辅助类
辅助类SerializableUtil,这个类用来把java对象序列化成字节数组,或者把字节数组反序列化成java对象。
辅助类MyRequestObject和MyResponseObject,这两个类是普通的java对象,实现了Serializable接口。MyRequestObject类是Client发出的请求,MyResponseObject是Server端作出的响应。
下面主要看一下Server端的代码,其中有一些英文注释对理解代码很有帮助,注释主要是来源jdk的文档和例子,这里就没有再翻译
下面是Client的代码,代码比较简单就是启动了100个线程来访问Server
最后测试上面的代码,首先运行Server类,然后运行Client类,就可以分别在Server端和Client端控制台看到发送或接收到的MyRequestObject或MyResponseObject对象了。
代码实现:
注:转自http://blog.csdn.net/kongxx/article/details/7288896
五:NIO2.0(AIO) 异步非阻塞
AIO编程:在NIO基础上引入异步的通到的概念,实现了异步文件和异步套字节,jdk1.7以后升级。
基本概念
1 AsynchronousChannel:支持异步通道,包括服务端AsynchronousServerSocketChannel和客户端AsynchronousSocketChannel等实现。 2 CompletionHandler:用户处理器。定义了一个用户处理就绪事件的接口,由用户自己实现,异步io的数据就绪后回调该处理器消费或处理数据。 3 AsynchronousChannelGroup:一个用于资源共享的异步通道集合。处理IO事件和分配给CompletionHandler。(具体这块还没细看代码,后续再分析这块)
所谓AIO,就是异步非阻塞IO,是NIO的升级版本,也就是NIO2.0版本,但是与NIO不同,当进行读写操作时,只须直接调用API的read或write方法即可。这两种方法均为异步
的,对于读操作而言,当有流可读取时,操作系统会将可读的流传入read方法的缓冲区,并通知应用程序;对于写操作而言,当操作系统将write方法传递的流写入完毕时,操作
系统主动通知应用程序。 即可以理解为,read/write方法都是异步的,完成后会主动调用回调函数。
具体代码实现:
1 // Server类: 2 3 4 5 /** 6 * 7 *类描述:AIO 服务端 8 *@author: 豪 9 *@date: 日期:2017-5-24 时间:上午10:48:12 10 *@version 1.0 11 */ 12 public class Server { 13 //线程池 14 private ExecutorService executorService; 15 //线程组 16 private AsynchronousChannelGroup threadGroup; 17 //服务器通道 18 public AsynchronousServerSocketChannel assc; 19 20 public Server(int port){ 21 try { 22 //创建一个缓存池 23 executorService = Executors.newCachedThreadPool(); 24 //创建线程组 25 threadGroup = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1); 26 //创建服务器通道 27 assc = AsynchronousServerSocketChannel.open(threadGroup); 28 //进行绑定 29 assc.bind(new InetSocketAddress(port)); 30 31 System.out.println("server start , port : " + port); 32 //进行阻塞 33 assc.accept(this, new ServerCompletionHandler()); 34 //一直阻塞 不让服务器停止 35 Thread.sleep(Integer.MAX_VALUE); 36 37 } catch (Exception e) { 38 e.printStackTrace(); 39 } 40 } 41 42 public static void main(String[] args) { 43 Server server = new Server(8765); 44 } 45 46 }
1 //ServerCompletionHandler类 2 3 4 /** 5 * 6 *类描述:服务端处理类 所有的处理都在此类进行 7 *@author: 豪 8 *@date: 日期:2017-5-24 时间:上午10:47:45 9 *@version 1.0 10 */ 11 public class ServerCompletionHandler implements CompletionHandler<AsynchronousSocketChannel, Server> { 12 13 @Override 14 public void completed(AsynchronousSocketChannel asc, Server attachment) { 15 //当有下一个客户端接入的时候 直接调用Server的accept方法,这样反复执行下去,保证多个客户端都可以阻塞 16 attachment.assc.accept(attachment, this); 17 read(asc); 18 } 19 20 private void read(final AsynchronousSocketChannel asc) { 21 //读取数据 22 ByteBuffer buf = ByteBuffer.allocate(1024); 23 asc.read(buf, buf, new CompletionHandler<Integer, ByteBuffer>() { 24 @Override 25 public void completed(Integer resultSize, ByteBuffer attachment) { 26 //进行读取之后,重置标识位 27 attachment.flip(); 28 //获得读取的字节数 29 System.out.println("Server -> " + "收到客户端的数据长度为:" + resultSize); 30 //获取读取的数据 31 String resultData = new String(attachment.array()).trim(); 32 System.out.println("Server -> " + "收到客户端的数据信息为:" + resultData); 33 String response = "服务器响应, 收到了客户端发来的数据: " + resultData; 34 write(asc, response); 35 } 36 @Override 37 public void failed(Throwable exc, ByteBuffer attachment) { 38 exc.printStackTrace(); 39 } 40 }); 41 } 42 43 private void write(AsynchronousSocketChannel asc, String response) { 44 try { 45 ByteBuffer buf = ByteBuffer.allocate(1024); 46 buf.put(response.getBytes()); 47 buf.flip(); 48 asc.write(buf).get(); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } catch (ExecutionException e) { 52 e.printStackTrace(); 53 } 54 } 55 56 @Override 57 public void failed(Throwable exc, Server attachment) { 58 exc.printStackTrace(); 59 } 60 61 }
1 //Clinet类: 2 3 /** 4 * 5 *类描述:AIO客户端 6 *@author: 豪 7 *@date: 日期:2017-5-24 时间:上午10:47:23 8 *@version 1.0 9 */ 10 public class Client implements Runnable{ 11 12 private AsynchronousSocketChannel asc ; 13 14 public Client() throws Exception { 15 asc = AsynchronousSocketChannel.open(); 16 } 17 18 public void connect(){ 19 asc.connect(new InetSocketAddress("127.0.0.1", 8765)); 20 } 21 22 public void write(String request){ 23 try { 24 asc.write(ByteBuffer.wrap(request.getBytes())).get(); 25 read(); 26 } catch (Exception e) { 27 e.printStackTrace(); 28 } 29 } 30 31 private void read() { 32 ByteBuffer buf = ByteBuffer.allocate(1024); 33 try { 34 asc.read(buf).get(); 35 buf.flip(); 36 byte[] respByte = new byte[buf.remaining()]; 37 buf.get(respByte); 38 System.out.println(new String(respByte,"utf-8").trim()); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } catch (ExecutionException e) { 42 e.printStackTrace(); 43 } catch (UnsupportedEncodingException e) { 44 e.printStackTrace(); 45 } 46 } 47 48 @Override 49 public void run() { 50 while(true){ 51 52 } 53 } 54 55 public static void main(String[] args) throws Exception { 56 Client c1 = new Client(); 57 c1.connect(); 58 59 Client c2 = new Client(); 60 c2.connect(); 61 62 Client c3 = new Client(); 63 c3.connect(); 64 65 new Thread(c1, "c1").start(); 66 new Thread(c2, "c2").start(); 67 new Thread(c3, "c3").start(); 68 69 Thread.sleep(1000); 70 71 c1.write("c1 aaa"); 72 c2.write("c2 bbbb"); 73 c3.write("c3 ccccc"); 74 } 75 76 }