网络编程涉及到两个进程(客户端-服务端)间的通信,服务端提供IP和监听端口,客户端通过连接操作向服务端监听的地址发起请求,如果连接成功(TCP三次握手),双方就可以通过套接字进行通信。
1、BIO 同步阻塞I/O
在BIO同步阻塞模型中,ServerSocket负责绑定IP地址,启动监听端口;Socket负责发起连接操作。连接成功后,双方通过输入和输出流进行同步阻塞式通信。
简单的描述一下BIO的服务端通信模型:
服务端通常由一个独立的acceptor线程T负责监听客户端的连接,T收到客户端连接请求socket后,将它交给一个线程M进行处理,M会从这个socket的inputstream中读取请求,处理完毕后,再将响应值写入这个socket的outputstream中返回,M关闭资源和socket连接后结束自己。
该模型缺乏弹性伸缩能力,当客户端访问量增加后,服务端的线程数与客户端的访问数呈1:1关系,而Java中的线程资源并不是无限制使用的,线程数量快速膨胀后,系统的性能将急剧下降(线程间切换的开销)。
这在需要满足高性能、高并发的场景是没法应用的,因为大量创建新的线程会严重影响服务器性能,甚至罢工。
为了改进这种一连接一线程的模型,我们可以使用线程池来管理线程:实现请求与线程N:M的关系。其实就是线程复用,比如可以使用FixedThreadPool限制线程最大数量,然而,限制了线程数量后,当并发请求时超过最大线程数时,有的请求将一直等待直到线程池中有空闲的线程来处理他。另外线程池中结合阻塞队列可以起到缓冲作用,但是当请求处理速度低于发起速度时,队列中的请求将越来越多,直至队列被填满而阻塞接收线程, 进而阻塞accept操作,accept被阻塞后新来的请求将滞留在tcp的缓存队列中,它最终也将被填满,直至最后tcp直接拒绝请求连接。这样当然比服务直接宕机委婉了很多,但是当发生大量请求时问题依然存在。
BIO示例:
由于Bio的处理代码相对简单一些,就多设计了一下,比如客户端使用栅栏使得客户端线程到达后同时发送请求,服务端重写newTask接口使服务线程能够被中断。
package com.io.Bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; import java.util.concurrent.atomic.AtomicInteger; import com.util.IOUtil; /** * Bio客户端<br> * * @author shanhm1991 * @date 2017年9月15日 */ public class BioClient extends Thread { private static volatile AtomicInteger clientRunningNum = new AtomicInteger(0); private static volatile AtomicInteger clientBlockingNum = new AtomicInteger(0); private static volatile AtomicInteger client_index = new AtomicInteger(0); private static Object lock = new Object(); private Socket socket; private String host; private int port; public BioClient(String host, int port) { super("client[" + host + ":" + port + "]-Thread-" + client_index.incrementAndGet()); this.host = host; this.port = port; } @Override public void run() { clientRunningNum.incrementAndGet(); while (true) { if (!isConnectionAlive()) { try { socket = new Socket(host, port); } catch (IOException e) { System.err.println(IOUtil.time() + this.getName() + " 连接服务异常:[" + e.getMessage() + "],中断自己"); interrupt(); return; } } BufferedReader in = null; PrintWriter out = null; String request = IOUtil.buildRequest(); try { out = new PrintWriter(socket.getOutputStream(), true); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); /** * 设了一个阻塞点,每次等所有客户端都到达后一起发送消息。<br> * 但是引出一个问题,在最后关闭的时候需要做对应处理,否则有的线程无限等待,<br> * 当然也可以选择使用ReentrantLock代替,设定最长等待时间以避免这个问题。 */ clientBlockingNum.incrementAndGet(); synchronized (lock) { while (clientBlockingNum.get() % clientRunningNum.get() != 0) { lock.wait(); } clientBlockingNum.set(0); lock.notifyAll(); } System.out.println(IOUtil.time() + this.getName() + " 发送请求:" + request); out.println(request); String response = in.readLine(); System.out.println(IOUtil.time() + this.getName() + " 收到响应:" + response); } catch (InterruptedException e) { interrupt(); // 在等待锁lock的时候可以响应中断 return; } catch (IOException e) { System.err.println(IOUtil.time() + this.getName() + " 异常中断:" + e.getMessage()); interrupt(); return; } finally { IOUtil.close(socket); } // 在Thread.sleep()中检测到中断时停止线程 try { sleep(10); } catch (InterruptedException e) { interrupt(); break; } } } /** * 客户端响应中断,关闭socket,结束自己<br> * 重设等待条件,并通知其他客户端线程不需要再等待自己 */ @Override public void interrupt() { super.interrupt();// 标记中断 IOUtil.close(socket); clientRunningNum.decrementAndGet(); System.err.println(IOUtil.time() + this.getName() + " 被中断,结束自己 " + "clientBlockingNum/clientRunningNum:" + clientBlockingNum.get() + "/" + clientRunningNum.get()); synchronized (lock) { if (clientRunningNum.get() == 0 || clientBlockingNum.get() % clientRunningNum.get() == 0) { lock.notifyAll(); } } } /** * 检查socket实时连接状态 */ private boolean isConnectionAlive() { if (socket == null) { return false; } try { socket.sendUrgentData(0xFF); return true; } catch (Exception e) { IOUtil.close(socket); return false; } } /** * 启动多个客户端,每个客户端启动后自行周期性发送同步消息<br> * * @测试 */ public static void main(String[] args) { for (int i = 1; i <= 20; i++) { new BioClient("127.0.0.1", 4040).start(); } } }
package com.io.Bio; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import com.util.Calculator; import com.util.IOUtil; /** * Bio服务端 * * @author shanhm1991 * @date 2017年9月15日 */ public class BioServer extends Thread { /** * 扩展了ThreadPoolExecutor的newTaskFor方法<br> * 在ThreadPoolExecutor关闭时,newTaskFor会将未处理的任务封装成RunnableFuture返回<br> * 这里扩展的最主要目的是使这个线程池支持由提交的任务来自行实现封装一个可以关闭socket的RunnableFuture */ private static class ExtendExecutor extends ThreadPoolExecutor { public ExtendExecutor(int corePoolSize, int maximumPoolSize, long keepAliveSeconds, int port) { super(corePoolSize, maximumPoolSize, keepAliveSeconds, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(QUEUE_SIZE), new HandleThreadFactory(port), new RejectedHandler()); } @Override protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { if (callable instanceof CancellableCall) { return ((CancellableCall<T>) callable).newTask(); } else { return super.newTaskFor(callable); } } /** * 线程工厂,这里只是为了标识线程的名字 */ private static class HandleThreadFactory implements ThreadFactory { private int port; private AtomicInteger index = new AtomicInteger(0); public HandleThreadFactory(int port) { this.port = port; } @Override public Thread newThread(Runnable r) { return new Thread(r, "server[" + port + "]-pool-handleThread-" + index.incrementAndGet()); } } private static class RejectedHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { //executor.submit(r)直接提交任务可能会导致stackoverfollow异常;<br> // r.run();如果线程池的任务都死了,可以让当前线程来执行,但这里已经想停掉所有线程了,所以没必要了 System.err.println(IOUtil.time() + Thread.currentThread().getName() + " 扔掉一个任务"); } } } /** * CancellableTask接口扩展了Callable, * 增加了一个cancel方法和一个newTask工厂方法来构造RunnableFuture */ private static interface CancellableCall<T> extends Callable<T> { T cancel(); RunnableFuture<T> newTask(); } /** * SocketTask实现了CancellableCall,并定义了Future.cancel。<br> * 如果SocketTask通过其自己的Future来取消,那么底层的套接字将被关闭并且线程将被中断。<br> */ @SuppressWarnings("hiding") private static abstract class SocketTask<HandleResult> implements CancellableCall<HandleResult> { protected Socket socket; protected void closeSocket() { IOUtil.close(socket); } public SocketTask(Socket socket) { this.socket = socket; } @Override public abstract HandleResult cancel(); @Override public RunnableFuture<HandleResult> newTask() { return new FutureTask<HandleResult>(this) { private HandleResult result; @Override public HandleResult get() { return result; } @Override @SuppressWarnings("finally") public boolean cancel(boolean mayInterruptIfRunning) { try { result = SocketTask.this.cancel(); } finally { return super.cancel(mayInterruptIfRunning); } } }; } } /** * 可取消的任务 */ private class HandleTask extends SocketTask<HandleResult> { private final String request; private HandleResult result; public HandleTask(Socket socket, String request) { super(socket); this.request = request; result = new HandleResult(request); } @Override public HandleResult cancel() { System.err .println(IOUtil.time() + Thread.currentThread().getName() + " 取消在等待被执行的任务:" + result.getRequest()); closeSocket(); return result; } @Override public HandleResult call() { PrintWriter out = null; try { out = new PrintWriter(socket.getOutputStream(), true); String response; System.out.println(IOUtil.time() + Thread.currentThread().getName() + " 处理请求:" + request); long beginTime = System.currentTimeMillis(); try { response = String.valueOf(Calculator.conversion(request)); } catch (final Exception e) { response = "server exception:" + e.getMessage(); } for (int i = 0; i < 8; i++) { for (int m = 0; m < 999; m++) { for (int n = 0; n < 99999; n++) { if (Thread.interrupted()) { closeSocket(); System.err.println(IOUtil.time() + Thread.currentThread().getName() + " 执行过程中强行中断:" + result.getRequest()); return result; } } } } out.println(response); result.setResult(response); System.out.println(IOUtil.time() + Thread.currentThread().getName() + " 返回响应:" + response + " ,处理耗时:" + (System.currentTimeMillis() - beginTime)); } catch (IOException e) { System.err.println(IOUtil.time() + Thread.currentThread().getName() + " 处理消息时异常结束:" + e.getMessage()); } finally { IOUtil.close(socket); } return result; } } /** * 任务结果 */ private class HandleResult { private String request; private String result; public HandleResult(String request) { this.request = request; } public String getRequest() { return request; } public void setResult(String result) { this.result = result; } @SuppressWarnings("unused") public String getResult() { return result; } } public static final int QUEUE_SIZE = 100; private static final int THREADPOOL_SIZE = 5; private static final int THREADPOOL_KEEPSECONDS = 1; private final ExtendExecutor handleExecService; private volatile AtomicBoolean state_running = new AtomicBoolean(false); private ServerSocket serverSocket; private Integer port; public BioServer(int port) { super("server[" + port + "]-acceptThread"); this.port = port; handleExecService = new ExtendExecutor(THREADPOOL_SIZE, THREADPOOL_SIZE, THREADPOOL_KEEPSECONDS, port); } @Override public void run() { try { serverSocket = new ServerSocket(port); } catch (IOException e) { System.err.println("服务启动异常:" + e.getMessage()); e.printStackTrace(); } System.out.println("服务启动端口:" + port); state_running.set(true); while (state_running.get()) { try { Socket socket = serverSocket.accept(); // new Thread(new HandleTask(socket)).start(); // executorService.execute(new HandleTask(socket)); BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String request = null; if ((request = reader.readLine()) != null) { System.out.println(IOUtil.time() + Thread.currentThread().getName() + " 收到请求:" + request); /** * 这里拦下来只是为打印这个接收线程所做的事情,以便好观察<br> * 这里不能关闭,后面socket的输出流来返回响应,只能由最后处理的线程来关闭<br> */ handleExecService.submit(new HandleTask(socket, request)); } } catch (IOException e) { if (serverSocket.isClosed()) { System.err.println(IOUtil.time() + Thread.currentThread().getName() + " socket已经关闭,开始关闭服务"); } else { System.err.println(IOUtil.time() + " 服务异常关闭!端口" + port); e.printStackTrace(); } interrupt(); break; } } } @Override public void interrupt() { super.interrupt();// 标记中断 state_running.set(false); System.err.println(IOUtil.time() + Thread.currentThread().getName() + ":中断服务,已置为关闭状态,不再接收新的请求,将在一秒后关闭自己的工作线程"); /** * 服务线程在每次接收请求前会先检查state_running,如果状态为运行,则接收下一个请求<br> * 服务线程在响应中断关闭自己时,会先将自己的state_running置为false,然后去关闭线程池<br> * 其实知道线程池关闭也是标志自己一个状态,表示不再接受任务,再提交任务就抛RejectedExecutionException<br> * * 当其他线程中断服务线程时,会修改它的状态以及关闭它的线程池,但是这并不是原子操作<br> * 当服务线程在检查了状态接收了请求之后,并在放入线程池之前,<br> * 另外一个线程(这里是main)把它的线程池关闭了,这是可能抛出一个RejectedExecutionException<br> * 所以这里等一秒,如果一秒后还有任务没进线程池就在RejectedHandler中扔掉或者记下来 */ try { Thread.sleep(1000); } catch (InterruptedException e1) { // 忽略中断 } System.err.println(IOUtil.time() + Thread.currentThread().getName() + ":关闭工作线程池,不在处理新的任务,并中断正在执行任务的线程"); List<Runnable> taskList = handleExecService.shutdownNow(); if (!taskList.isEmpty()) { for (Runnable task : taskList) { @SuppressWarnings("unchecked") FutureTask<HandleResult> future = (FutureTask<HandleResult>) task; future.cancel(true); HandleResult result = null; try { result = future.get();// get方法,可以吐出提交的请求 if (result != null) { System.err.println( IOUtil.time() + Thread.currentThread().getName() + " 保存被取消的任务:" + result.getRequest()); } } catch (InterruptedException e) { // 忽略中断 } catch (ExecutionException e) { e.printStackTrace(); } } } IOUtil.close(serverSocket); System.err.println(IOUtil.time() + Thread.currentThread().getName() + ":服务已关闭,断开连接"); } /** * @测试 */ public static void main(String[] args) throws InterruptedException { BioServer server = new BioServer(4040); server.start(); Thread.sleep(7000); server.interrupt(); } }
测试结果:先开启服务,接着开20个客户端同时发请求,7秒后关闭服务。
代码中客户端的数目和发请求的频率,服务处理请求的延时和服务运行的时间都可以修改,以便看到不同的运行现象。
2017-10-07 10:20:18 557 client[127.0.0.1:4040]-Thread-19 发送请求:1/1 2017-10-07 10:20:18 558 client[127.0.0.1:4040]-Thread-12 发送请求:6/3 2017-10-07 10:20:18 558 client[127.0.0.1:4040]-Thread-16 发送请求:9+1 2017-10-07 10:20:18 558 client[127.0.0.1:4040]-Thread-4 发送请求:6+4 2017-10-07 10:20:18 558 client[127.0.0.1:4040]-Thread-8 发送请求:2/1 2017-10-07 10:20:18 559 client[127.0.0.1:4040]-Thread-17 发送请求:7-6 2017-10-07 10:20:18 559 client[127.0.0.1:4040]-Thread-7 发送请求:9-3 2017-10-07 10:20:18 559 client[127.0.0.1:4040]-Thread-5 发送请求:9-4 2017-10-07 10:20:18 559 client[127.0.0.1:4040]-Thread-9 发送请求:0*1 2017-10-07 10:20:18 559 client[127.0.0.1:4040]-Thread-3 发送请求:1+3 2017-10-07 10:20:18 559 client[127.0.0.1:4040]-Thread-1 发送请求:8/6 2017-10-07 10:20:18 560 client[127.0.0.1:4040]-Thread-15 发送请求:0*4 2017-10-07 10:20:18 560 client[127.0.0.1:4040]-Thread-13 发送请求:2/6 2017-10-07 10:20:18 560 client[127.0.0.1:4040]-Thread-2 发送请求:3/9 2017-10-07 10:20:18 560 client[127.0.0.1:4040]-Thread-10 发送请求:9*2 2017-10-07 10:20:18 557 client[127.0.0.1:4040]-Thread-14 发送请求:6/7 2017-10-07 10:20:18 560 client[127.0.0.1:4040]-Thread-18 发送请求:3+5 2017-10-07 10:20:18 557 client[127.0.0.1:4040]-Thread-11 发送请求:9/1 2017-10-07 10:20:18 561 client[127.0.0.1:4040]-Thread-6 发送请求:2*1 2017-10-07 10:20:18 557 client[127.0.0.1:4040]-Thread-20 发送请求:6*1 2017-10-07 10:20:20 392 client[127.0.0.1:4040]-Thread-9 收到响应:0.0 2017-10-07 10:20:20 408 client[127.0.0.1:4040]-Thread-18 收到响应:8.0 2017-10-07 10:20:20 445 client[127.0.0.1:4040]-Thread-15 收到响应:0.0 2017-10-07 10:20:21 429 client[127.0.0.1:4040]-Thread-4 收到响应:10.0 2017-10-07 10:20:21 434 client[127.0.0.1:4040]-Thread-6 收到响应:2.0 2017-10-07 10:20:21 548 client[127.0.0.1:4040]-Thread-2 收到响应:0.3333333333333333 2017-10-07 10:20:21 993 client[127.0.0.1:4040]-Thread-7 收到响应:6.0 2017-10-07 10:20:22 010 client[127.0.0.1:4040]-Thread-14 收到响应:0.8571428571428571 2017-10-07 10:20:22 468 client[127.0.0.1:4040]-Thread-13 收到响应:0.3333333333333333 2017-10-07 10:20:22 491 client[127.0.0.1:4040]-Thread-20 收到响应:6.0 2017-10-07 10:20:22 603 client[127.0.0.1:4040]-Thread-5 收到响应:5.0 2017-10-07 10:20:23 427 client[127.0.0.1:4040]-Thread-16 收到响应:null 2017-10-07 10:20:23 427 client[127.0.0.1:4040]-Thread-17 收到响应:null 2017-10-07 10:20:23 427 client[127.0.0.1:4040]-Thread-10 收到响应:null 2017-10-07 10:20:23 428 client[127.0.0.1:4040]-Thread-3 收到响应:null 2017-10-07 10:20:23 429 client[127.0.0.1:4040]-Thread-1 收到响应:null 2017-10-07 10:20:23 429 client[127.0.0.1:4040]-Thread-8 收到响应:null 2017-10-07 10:20:23 430 client[127.0.0.1:4040]-Thread-11 收到响应:null 2017-10-07 10:20:23 430 client[127.0.0.1:4040]-Thread-12 收到响应:null 2017-10-07 10:20:23 431 client[127.0.0.1:4040]-Thread-19 收到响应:null 2017-10-07 10:20:24 439 client[127.0.0.1:4040]-Thread-12 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 439 client[127.0.0.1:4040]-Thread-11 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 439 client[127.0.0.1:4040]-Thread-12 被中断,结束自己 clientBlockingNum/clientRunningNum:11/18 2017-10-07 10:20:24 441 client[127.0.0.1:4040]-Thread-11 被中断,结束自己 clientBlockingNum/clientRunningNum:11/18 2017-10-07 10:20:24 442 client[127.0.0.1:4040]-Thread-16 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 442 client[127.0.0.1:4040]-Thread-16 被中断,结束自己 clientBlockingNum/clientRunningNum:11/17 2017-10-07 10:20:24 442 client[127.0.0.1:4040]-Thread-19 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 442 client[127.0.0.1:4040]-Thread-19 被中断,结束自己 clientBlockingNum/clientRunningNum:11/16 2017-10-07 10:20:24 443 client[127.0.0.1:4040]-Thread-3 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 443 client[127.0.0.1:4040]-Thread-3 被中断,结束自己 clientBlockingNum/clientRunningNum:11/15 2017-10-07 10:20:24 444 client[127.0.0.1:4040]-Thread-8 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 444 client[127.0.0.1:4040]-Thread-8 被中断,结束自己 clientBlockingNum/clientRunningNum:11/14 2017-10-07 10:20:24 444 client[127.0.0.1:4040]-Thread-1 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 444 client[127.0.0.1:4040]-Thread-1 被中断,结束自己 clientBlockingNum/clientRunningNum:11/13 2017-10-07 10:20:24 444 client[127.0.0.1:4040]-Thread-10 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 450 client[127.0.0.1:4040]-Thread-10 被中断,结束自己 clientBlockingNum/clientRunningNum:11/12 2017-10-07 10:20:24 451 client[127.0.0.1:4040]-Thread-17 连接服务异常:[Connection refused: connect],中断自己 2017-10-07 10:20:24 451 client[127.0.0.1:4040]-Thread-17 被中断,结束自己 clientBlockingNum/clientRunningNum:11/11 2017-10-07 10:20:24 451 client[127.0.0.1:4040]-Thread-20 发送请求:2+6 2017-10-07 10:20:24 452 client[127.0.0.1:4040]-Thread-20 异常中断:Connection reset 2017-10-07 10:20:24 452 client[127.0.0.1:4040]-Thread-20 被中断,结束自己 clientBlockingNum/clientRunningNum:0/10 2017-10-07 10:20:24 457 client[127.0.0.1:4040]-Thread-5 发送请求:6-7 2017-10-07 10:20:24 457 client[127.0.0.1:4040]-Thread-5 异常中断:Connection reset 2017-10-07 10:20:24 457 client[127.0.0.1:4040]-Thread-5 被中断,结束自己 clientBlockingNum/clientRunningNum:0/9 2017-10-07 10:20:24 459 client[127.0.0.1:4040]-Thread-13 发送请求:7-4 2017-10-07 10:20:24 460 client[127.0.0.1:4040]-Thread-13 异常中断:Connection reset 2017-10-07 10:20:24 460 client[127.0.0.1:4040]-Thread-13 被中断,结束自己 clientBlockingNum/clientRunningNum:0/8 2017-10-07 10:20:24 460 client[127.0.0.1:4040]-Thread-14 发送请求:3-2 2017-10-07 10:20:24 460 client[127.0.0.1:4040]-Thread-7 发送请求:9-1 2017-10-07 10:20:24 461 client[127.0.0.1:4040]-Thread-14 异常中断:Connection reset 2017-10-07 10:20:24 461 client[127.0.0.1:4040]-Thread-7 异常中断:Connection reset 2017-10-07 10:20:24 461 client[127.0.0.1:4040]-Thread-14 被中断,结束自己 clientBlockingNum/clientRunningNum:0/7 2017-10-07 10:20:24 461 client[127.0.0.1:4040]-Thread-7 被中断,结束自己 clientBlockingNum/clientRunningNum:0/6 2017-10-07 10:20:24 461 client[127.0.0.1:4040]-Thread-2 发送请求:7-9 2017-10-07 10:20:24 462 client[127.0.0.1:4040]-Thread-2 异常中断:Connection reset 2017-10-07 10:20:24 462 client[127.0.0.1:4040]-Thread-2 被中断,结束自己 clientBlockingNum/clientRunningNum:0/5 2017-10-07 10:20:24 463 client[127.0.0.1:4040]-Thread-6 异常中断:Connection reset 2017-10-07 10:20:24 462 client[127.0.0.1:4040]-Thread-6 发送请求:1*5 2017-10-07 10:20:24 463 client[127.0.0.1:4040]-Thread-4 发送请求:6/5 2017-10-07 10:20:24 463 client[127.0.0.1:4040]-Thread-6 被中断,结束自己 clientBlockingNum/clientRunningNum:0/4 2017-10-07 10:20:24 463 client[127.0.0.1:4040]-Thread-4 异常中断:Connection reset 2017-10-07 10:20:24 463 client[127.0.0.1:4040]-Thread-4 被中断,结束自己 clientBlockingNum/clientRunningNum:0/3 2017-10-07 10:20:24 464 client[127.0.0.1:4040]-Thread-15 发送请求:1*3 2017-10-07 10:20:24 464 client[127.0.0.1:4040]-Thread-15 异常中断:Connection reset 2017-10-07 10:20:24 464 client[127.0.0.1:4040]-Thread-15 被中断,结束自己 clientBlockingNum/clientRunningNum:0/2 2017-10-07 10:20:24 465 client[127.0.0.1:4040]-Thread-18 发送请求:7/7 2017-10-07 10:20:24 466 client[127.0.0.1:4040]-Thread-18 异常中断:Connection reset 2017-10-07 10:20:24 466 client[127.0.0.1:4040]-Thread-18 被中断,结束自己 clientBlockingNum/clientRunningNum:0/1 2017-10-07 10:20:24 466 client[127.0.0.1:4040]-Thread-9 发送请求:5/8 2017-10-07 10:20:24 471 client[127.0.0.1:4040]-Thread-9 异常中断:Connection reset 2017-10-07 10:20:24 471 client[127.0.0.1:4040]-Thread-9 被中断,结束自己 clientBlockingNum/clientRunningNum:0/0
服务启动端口:4040 2017-10-07 10:20:18 597 server[4040]-acceptThread 收到请求:9-3 2017-10-07 10:20:18 603 server[4040]-acceptThread 收到请求:3+5 2017-10-07 10:20:18 603 server[4040]-pool-handleThread-1 处理请求:9-3 2017-10-07 10:20:18 603 server[4040]-acceptThread 收到请求:0*4 2017-10-07 10:20:18 604 server[4040]-pool-handleThread-2 处理请求:3+5 2017-10-07 10:20:18 605 server[4040]-acceptThread 收到请求:0*1 2017-10-07 10:20:18 605 server[4040]-pool-handleThread-3 处理请求:0*4 2017-10-07 10:20:18 606 server[4040]-acceptThread 收到请求:6/7 2017-10-07 10:20:18 607 server[4040]-acceptThread 收到请求:2*1 2017-10-07 10:20:18 608 server[4040]-pool-handleThread-4 处理请求:0*1 2017-10-07 10:20:18 608 server[4040]-acceptThread 收到请求:6+4 2017-10-07 10:20:18 608 server[4040]-pool-handleThread-5 处理请求:6/7 2017-10-07 10:20:18 608 server[4040]-acceptThread 收到请求:3/9 2017-10-07 10:20:18 609 server[4040]-acceptThread 收到请求:6*1 2017-10-07 10:20:18 609 server[4040]-acceptThread 收到请求:2/6 2017-10-07 10:20:18 609 server[4040]-acceptThread 收到请求:9-4 2017-10-07 10:20:18 609 server[4040]-acceptThread 收到请求:8/6 2017-10-07 10:20:18 611 server[4040]-acceptThread 收到请求:1+3 2017-10-07 10:20:18 612 server[4040]-acceptThread 收到请求:7-6 2017-10-07 10:20:18 612 server[4040]-acceptThread 收到请求:9*2 2017-10-07 10:20:18 612 server[4040]-acceptThread 收到请求:9+1 2017-10-07 10:20:18 613 server[4040]-acceptThread 收到请求:2/1 2017-10-07 10:20:18 614 server[4040]-acceptThread 收到请求:9/1 2017-10-07 10:20:18 614 server[4040]-acceptThread 收到请求:6/3 2017-10-07 10:20:18 614 server[4040]-acceptThread 收到请求:1/1 2017-10-07 10:20:20 392 server[4040]-pool-handleThread-4 返回响应:0.0 ,处理耗时:1784 2017-10-07 10:20:20 392 server[4040]-pool-handleThread-4 处理请求:2*1 2017-10-07 10:20:20 410 server[4040]-pool-handleThread-2 返回响应:8.0 ,处理耗时:1806 2017-10-07 10:20:20 410 server[4040]-pool-handleThread-2 处理请求:6+4 2017-10-07 10:20:20 445 server[4040]-pool-handleThread-3 返回响应:0.0 ,处理耗时:1840 2017-10-07 10:20:20 445 server[4040]-pool-handleThread-3 处理请求:3/9 2017-10-07 10:20:21 429 server[4040]-pool-handleThread-2 返回响应:10.0 ,处理耗时:1019 2017-10-07 10:20:21 430 server[4040]-pool-handleThread-2 处理请求:6*1 2017-10-07 10:20:21 434 server[4040]-pool-handleThread-4 返回响应:2.0 ,处理耗时:1041 2017-10-07 10:20:21 434 server[4040]-pool-handleThread-4 处理请求:2/6 2017-10-07 10:20:21 548 server[4040]-pool-handleThread-3 返回响应:0.3333333333333333 ,处理耗时:1102 2017-10-07 10:20:21 548 server[4040]-pool-handleThread-3 处理请求:9-4 2017-10-07 10:20:21 993 server[4040]-pool-handleThread-1 返回响应:6.0 ,处理耗时:3390 2017-10-07 10:20:21 994 server[4040]-pool-handleThread-1 处理请求:8/6 2017-10-07 10:20:22 010 server[4040]-pool-handleThread-5 返回响应:0.8571428571428571 ,处理耗时:3402 2017-10-07 10:20:22 011 server[4040]-pool-handleThread-5 处理请求:1+3 2017-10-07 10:20:22 392 main:中断服务,已置为关闭状态,不再接收新的请求,将在一秒后关闭自己的工作线程 2017-10-07 10:20:22 467 server[4040]-pool-handleThread-4 返回响应:0.3333333333333333 ,处理耗时:1034 2017-10-07 10:20:22 468 server[4040]-pool-handleThread-4 处理请求:7-6 2017-10-07 10:20:22 491 server[4040]-pool-handleThread-2 返回响应:6.0 ,处理耗时:1061 2017-10-07 10:20:22 492 server[4040]-pool-handleThread-2 处理请求:9*2 2017-10-07 10:20:22 603 server[4040]-pool-handleThread-3 返回响应:5.0 ,处理耗时:1055 2017-10-07 10:20:22 604 server[4040]-pool-handleThread-3 处理请求:9+1 2017-10-07 10:20:23 426 main:关闭工作线程池,不在处理新的任务,并中断正在执行任务的线程 2017-10-07 10:20:23 427 server[4040]-pool-handleThread-3 执行过程中强行中断:9+1 2017-10-07 10:20:23 428 server[4040]-pool-handleThread-4 执行过程中强行中断:7-6 2017-10-07 10:20:23 428 main 取消在等待被执行的任务:2/1 2017-10-07 10:20:23 429 server[4040]-pool-handleThread-1 执行过程中强行中断:8/6 2017-10-07 10:20:23 428 server[4040]-pool-handleThread-5 执行过程中强行中断:1+3 2017-10-07 10:20:23 429 main 保存被取消的任务:2/1 2017-10-07 10:20:23 429 main 取消在等待被执行的任务:9/1 2017-10-07 10:20:23 430 main 保存被取消的任务:9/1 2017-10-07 10:20:23 430 main 取消在等待被执行的任务:6/3 2017-10-07 10:20:23 430 main 保存被取消的任务:6/3 2017-10-07 10:20:23 430 main 取消在等待被执行的任务:1/1 2017-10-07 10:20:23 431 main 保存被取消的任务:1/1 2017-10-07 10:20:23 431 main:服务已关闭,断开连接 2017-10-07 10:20:23 435 server[4040]-pool-handleThread-2 执行过程中强行中断:9*2 2017-10-07 10:20:24 467 server[4040]-acceptThread 扔掉一个任务 2017-10-07 10:20:24 467 server[4040]-acceptThread 收到请求:5/8
2、NIO 非阻塞I/O
JDK 1.4中的java.nio.*包中引入新的Java I/O库,其目的是提高速度。实际上,“旧”的I/O包已经使用NIO重新实现过,即使我们不显式的使用NIO编程,也能从中受益。速度的提高在文件I/O和网络I/O中都可能会发生。
NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
2.1、缓冲区 Buffer
Buffer是一个对象,包含一些要写入或者读出的数据。
在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过
缓冲区进行操作。缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。
2.2、通道 Channel
我们对数据的读取和写入要通过channel,它就像水管一样,是一个通道。channel不同于stream的地方在于它是双向的,可以读或者写,而BIO中读取和写入首先要分别获取socket的inputstream和outputstream。在操作系统底层的通道一般都是全双工的,所以全双工的channel比stream能更好的映射底层操作系统的api。
Channel主要分两大类:
1.SelectableChannel:网络读写, 其子类:ServerSocketChannel 和 SocketChannel
2.FileChannel:文件操作
2.3、多路复用器 Selector
selector是NIO 编程的基础。
在多路复用器关闭后,所有注册在上面的channel和pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源。
selector提供选择已经就绪的任务的能力,selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被
selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
NIO示例:
package com.io.Nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import com.util.IOUtil; /** * Nio客户端 * * @author shanhm1991 * @date 2017年9月21日 */ public class NioClient extends Thread { private static volatile AtomicInteger client_index = new AtomicInteger(0); private int port; private String host; private Selector selector; private SocketChannel socketChannel; private CountDownLatch latch = new CountDownLatch(1); private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private volatile AtomicBoolean ready = new AtomicBoolean(false); private NioClient(String host, int port) { super("client[" + host + ":" + port + "]-Thread-" + client_index.incrementAndGet()); this.host = host; this.port = port; } @Override public void run() { long begin = System.currentTimeMillis(); System.out.println("启动客户端"); try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); if (socketChannel.connect(new InetSocketAddress(host, port))) { System.out.println("启动耗时:" + (System.currentTimeMillis() - begin)); latch.countDown(); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } catch (IOException e) { System.out.println(IOUtil.time() + this.getName() + ":" + e.getMessage()); IOUtil.close(selector); return; } while (true) { try { selector.select(); for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey key = it.next(); it.remove(); if (!key.isValid()) { continue; } SocketChannel channel = (SocketChannel) key.channel(); if (key.isConnectable()) { /** * Once the connection is established, or the attempt has failed,<br> * the socket channel will become connectable <br> * and this method may be invoked to complete the connection sequence. */ if (channel.finishConnect()) { System.out.println("启动耗时:" + (System.currentTimeMillis() - begin)); latch.countDown(); }else{ System.err.println(IOUtil.time() + this.getName() + ":连接服务失败" ); IOUtil.close(selector); return; } } if(key.isReadable()){ ByteBuffer buffer = ByteBuffer.allocate(1024); int readBytes = channel.read(buffer); /** * readBytes>0和readBytes=0,属于正常场景 * readBytes=-1表示链路已经关闭 */ if(readBytes > 0){ buffer.flip(); byte[] byteArray = new byte[buffer.remaining()]; buffer.get(byteArray); String response = new String(byteArray, "UTF-8"); System.out.println(IOUtil.time() + this.getName() + " 收到响应:" + response); lock.lock(); try{ ready.set(false); condition.signalAll(); }finally{ lock.unlock(); } }else if(readBytes < 0){ key.cancel(); channel.close(); } } } } catch (IOException e) { lock.lock(); try{ ready.set(false); condition.signalAll(); }finally{ lock.unlock(); } System.err.println(IOUtil.time() + this.getName() + ":异常关闭," + e.getMessage()); IOUtil.close(selector); return; } } } public void request(String request) throws InterruptedException { latch.await(); /** * 由于nio是非阻塞的,channel的读操作不会阻塞写操作,所以这里会一直发送消息,不管有没收到响应<br> * 又这里发的消息只是一个简单的字符串,服务端在接收消息时也没做处理<br> * 如果这里不加一个阻塞点,等上次响应回来后,再发送下一次,服务端会将一下子读出多个请求并当做一个请求处理。<br> * 其实这样是不可取的,真正发消息时双方肯定是有协议的,服务能够根据协议信息区分出每个请求<br> */ lock.lock(); try{ while(ready.get()){ condition.await(); } ready.set(true); }finally{ lock.unlock(); } byte[] bytes = request.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); System.out.println(IOUtil.time() + Thread.currentThread().getName() + " 发送请求:" + request); try { socketChannel.write(buffer); socketChannel.register(selector, SelectionKey.OP_READ); } catch (ClosedSelectorException e){ Thread.currentThread().interrupt(); } catch (IOException e) { System.err.println(e.getMessage()); IOUtil.close(selector); Thread.currentThread().interrupt(); } } /** * @测试 */ public static void main(final String[] args) throws IOException { NioClient client = new NioClient("127.0.0.1", 8080); client.start(); while(true){ try { client.request(IOUtil.buildRequest()); } catch (InterruptedException e) { System.err.println(IOUtil.time() + Thread.currentThread().getName() + " 发送失败,客户端结束自己"); return; } } } }
package com.io.Nio; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import com.util.Calculator; import com.util.IOUtil; /** * Nio服务端 * * @author shanhm1991 * @date 2017年9月21日 */ public class NioServer extends Thread { private int port; private Selector selector; private ServerSocketChannel serverChannel; public NioServer(int port) { this.port = port; } @Override public void run() { try { selector = Selector.open(); serverChannel = ServerSocketChannel.open(); serverChannel.configureBlocking(false); serverChannel.socket().bind(new InetSocketAddress(port), 1024); serverChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("启动服务端口:" + port); } catch (IOException e) { System.err.println(IOUtil.time() + "启动失败"); e.printStackTrace(); IOUtil.close(serverChannel); return; } while (true) { try { selector.select(); } catch (IOException e) { System.err.print(IOUtil.time() + "服务异常关闭"); e.printStackTrace(); break; } catch (ClosedSelectorException e) { System.err.print(IOUtil.time() + "服务已经被关闭 "); break; } for (Iterator<SelectionKey> it = selector.selectedKeys().iterator(); it.hasNext();) { SelectionKey key = it.next(); it.remove(); try { if (!key.isValid()) { continue; } if (key.isAcceptable()) { ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel(); SocketChannel channel = serverChannel.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); if (channel.read(buffer) > 0) { buffer.flip(); byte[] byteArray = new byte[buffer.remaining()]; buffer.get(byteArray); String request = new String(byteArray, "UTF-8"); System.out.println(IOUtil.time() + "服务收到请求:" + request); response(channel, request); } } } catch (IOException e) { IOUtil.close(key.channel()); System.err.println(IOUtil.time() + ":" + e.getMessage()); } } } } private void response(SocketChannel channel, String request) throws IOException { long begin = System.currentTimeMillis(); for (int i = 0; i < 6; i++) { for (int m = 0; m < 995; m++) { for (int n = 0; n < 99999; n++) { if (Thread.interrupted()) { //可以设置响应中断 } } } } String response = String.valueOf(Calculator.conversion(request)); byte[] bytes = response.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); buffer.put(bytes); buffer.flip(); System.out.println(IOUtil.time() + "服务发送响应:" + response + ",处理耗时:" + (System.currentTimeMillis() - begin)); channel.write(buffer); } public void shutdown(){ IOUtil.close(selector); } /** * @测试 */ public static void main(String[] args) throws InterruptedException { NioServer server = new NioServer(8080); server.start(); Thread.sleep(10000); server.shutdown(); } }
测试结果:先开启服务,接着开客户端发送请求,10秒后关闭服务。
启动客户端 启动耗时:43 2017-10-07 11:10:28 736 main 发送请求:0+6 2017-10-07 11:10:30 050 client[127.0.0.1:8080]-Thread-1 收到响应:6.0 2017-10-07 11:10:30 050 main 发送请求:5-8 2017-10-07 11:10:30 551 client[127.0.0.1:8080]-Thread-1 收到响应:-3.0 2017-10-07 11:10:30 552 main 发送请求:4-3 2017-10-07 11:10:31 872 client[127.0.0.1:8080]-Thread-1 收到响应:1.0 2017-10-07 11:10:31 872 main 发送请求:8/9 2017-10-07 11:10:32 366 client[127.0.0.1:8080]-Thread-1 收到响应:0.888888888888889 2017-10-07 11:10:32 366 main 发送请求:3-7 2017-10-07 11:10:32 869 client[127.0.0.1:8080]-Thread-1 收到响应:-4.0 2017-10-07 11:10:32 870 main 发送请求:7/5 2017-10-07 11:10:33 354 client[127.0.0.1:8080]-Thread-1 收到响应:1.4 2017-10-07 11:10:33 354 main 发送请求:0/6 2017-10-07 11:10:33 863 client[127.0.0.1:8080]-Thread-1 收到响应:0.0 2017-10-07 11:10:33 863 main 发送请求:8*7 2017-10-07 11:10:34 362 client[127.0.0.1:8080]-Thread-1 收到响应:56.0 2017-10-07 11:10:34 363 main 发送请求:1-5 2017-10-07 11:10:34 787 client[127.0.0.1:8080]-Thread-1 收到响应:-4.0 2017-10-07 11:10:34 788 main 发送请求:5-8 2017-10-07 11:10:34 799 main 发送请求:1*6 2017-10-07 11:10:34 795 client[127.0.0.1:8080]-Thread-1:异常关闭,远程主机强迫关闭了一个现有的连接。 2017-10-07 11:10:34 799 main 发送失败,客户端结束自己
启动服务端口:8080 2017-10-07 11:10:28 771 服务收到请求:0+6 2017-10-07 11:10:30 049 服务发送响应:6.0,处理耗时:1278 2017-10-07 11:10:30 051 服务收到请求:5-8 2017-10-07 11:10:30 551 服务发送响应:-3.0,处理耗时:500 2017-10-07 11:10:30 552 服务收到请求:4-3 2017-10-07 11:10:31 871 服务发送响应:1.0,处理耗时:1319 2017-10-07 11:10:31 872 服务收到请求:8/9 2017-10-07 11:10:32 365 服务发送响应:0.888888888888889,处理耗时:494 2017-10-07 11:10:32 367 服务收到请求:3-7 2017-10-07 11:10:32 869 服务发送响应:-4.0,处理耗时:502 2017-10-07 11:10:32 870 服务收到请求:7/5 2017-10-07 11:10:33 353 服务发送响应:1.4,处理耗时:483 2017-10-07 11:10:33 354 服务收到请求:0/6 2017-10-07 11:10:33 863 服务发送响应:0.0,处理耗时:508 2017-10-07 11:10:33 864 服务收到请求:8*7 2017-10-07 11:10:34 362 服务发送响应:56.0,处理耗时:498 2017-10-07 11:10:34 363 服务收到请求:1-5 2017-10-07 11:10:34 786 服务发送响应:-4.0,处理耗时:424 2017-10-07 11:10:34 787 服务已经被关闭
这个代码写到这里,应该都会有个想法,可以设计一下,客户端进程起一个线程对象T专门负责收发消息,维护一个selector和一组channel,其他的业务线程都通过T提供的channel(T已经把它注册在selector上)来发消息,由T来接收响应并交给对应的线程(消息中会有对应的标识)。
另外,由于socketchannel是异步非阻塞的,它并不保证一次能够把需要发送的字节数组发送完,此时会出现写半包的问题,需要注册写操作,不断轮训selector将没有发送完的bytebuffer发送完毕,然后可以通过bytebuffer的hasRemain()方法判断消息是否发送完成。
3、AIO 异步非阻塞I/O
Nio 2.0的异步套接字通道是真正的异步非阻塞I/O,它对应于UNIX网络编程中的事件驱动I/O,(Nio对应I/O复用模型,调用系统的epoll()轮询channel)。它不需要通过多路复用器selector对注册的通道进行轮询操作即可实现异步读写,从而简化了Nio的编程模型。
AIO示例:
package com.io.Aio.callback; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; import com.util.IOUtil; /** * @author shanhm1991 * @date 2017年10月9日 */ public class AioClient extends Thread { private class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; private CountDownLatch clientLatch; private CountDownLatch requestLatch; private Thread requestThread; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch clientLatch,CountDownLatch requestLatch,Thread requestThread) { this.channel = clientChannel; this.clientLatch = clientLatch; this.requestLatch = requestLatch; this.requestThread = requestThread; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); final byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); try { System.out.println(IOUtil.time() + "客户端收到响应:" + new String(bytes, "UTF-8")); } catch (UnsupportedEncodingException e) { System.err.println(this.getClass().getName() + ":" + e.getMessage()); } requestLatch.countDown(); } @Override public void failed(final Throwable e, ByteBuffer attachment) { System.err.println(IOUtil.time() + "客户端获取响应失败," + this.getClass().getName() + ":" + e.getMessage()); /** * 结束客户端 */ requestThread.interrupt(); clientLatch.countDown(); IOUtil.close(channel); } } private class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; private CountDownLatch clientLatch; private CountDownLatch requestLatch; private Thread requestThread; public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch clientLatch,CountDownLatch requestLatch,Thread requestThread) { this.channel = clientChannel; this.clientLatch = clientLatch; this.requestLatch = requestLatch; this.requestThread = requestThread; } @Override public void completed(final Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) { channel.write(buffer, buffer, this); } else { ByteBuffer readBuffer = ByteBuffer.allocate(1024); channel.read(readBuffer, readBuffer, new ReadHandler(channel,clientLatch,requestLatch,requestThread)); } } @Override public void failed(Throwable e, ByteBuffer attachment) { System.err.println(IOUtil.time() + "发送失败," + this.getClass().getName() + ":" + e.getMessage()); /** * 结束客户端 */ requestThread.interrupt(); clientLatch.countDown(); IOUtil.close(channel); } } private class ConnectHandler implements CompletionHandler<Void, AioClient> { @Override public void completed(Void result, AioClient attachment) { System.out.println("客户端成功连接到服务器..."); } @Override public void failed(Throwable e, AioClient client) { client.clientLatch.countDown(); IOUtil.close(channel); System.err.println("连接服务器失败," + this.getClass().getName() + ":" + e.getMessage()); } } private AsynchronousSocketChannel channel; private String host; private int port; private CountDownLatch clientLatch; public AioClient(String host, int port) { this.host = host; this.port = port; } @Override public void run() { try { channel = AsynchronousSocketChannel.open(); } catch (IOException e) { System.out.println("客户端启动异常:" + e.getMessage()); return; } clientLatch = new CountDownLatch(1); channel.connect(new InetSocketAddress(host, port), this, new ConnectHandler()); /** * clientLatch将客户端线程阻塞住,不让线程结束<br> * 它可以响应中断,如果想结束客户端线程,可以中断它。 */ try { clientLatch.await(); } catch (final InterruptedException e1) { clientLatch.countDown(); } IOUtil.close(channel); } public void request( String msg) throws InterruptedException { /** * 使用requestLatch强行将请求动作阻塞住,当readhandler中complete后才放开。<br> * 主要是避免 java.nio.channels.ReadPendingException。 * 如果一个读写操作没有完成,程序又发送一个读写操作命令,则导致ReadPendingException或者WritePendingException。 * 这样的设计并不好,实际场景应该不会这样搞。 * 由于这里只有一个channel,又在main中使用while(true)发消息,并且在服务中处理加了延时,所以如果不阻塞住,这里很容易导致异常。 * 所以这里多传了几个构造参数,是为了在read或者writer过程中出现异常时直接关闭客户端。 */ CountDownLatch requestLatch = new CountDownLatch(1); Thread requestThread = Thread.currentThread(); byte[] req = msg.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(req.length); buffer.put(req); buffer.flip(); channel.write(buffer, buffer, new WriteHandler(channel,clientLatch,requestLatch,requestThread)); requestLatch.await(); } /** * @测试 */ public static void main(String[] args) { AioClient client = new AioClient("127.0.0.1", 8080); client.start(); while(true){ String request = IOUtil.buildRequest(); System.out.println(IOUtil.time() + "客户端发送请求:" + request); try { client.request(request); } catch (InterruptedException e) { System.out.println(IOUtil.time() + "客户端已关闭,结束发送"); return; } } } }
package com.io.Aio.callback; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.StandardSocketOptions; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.CountDownLatch; import com.util.Calculator; import com.util.IOUtil; /** * @author shanhm1991 * @date 2017年10月9日 */ public class AioServer extends Thread { private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AioServer> { @Override public void completed(AsynchronousSocketChannel channel, AioServer server) { server.serverChannel.accept(server,this); ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable e, AioServer server) { System.err.println(this.getClass().getName() + ":" + e.getMessage()); } } private class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); byte[] message = new byte[buffer.remaining()]; buffer.get(message); String request = null; String response = null; long begin = System.currentTimeMillis(); try { request = new String(message, "UTF-8"); for (int i = 0; i < 9; i++) { for (int m = 0; m < 9999; m++) { for (int n = 0; n < 9999; n++) { if (Thread.interrupted()) { //可以设置响应中断 } } } } response = String.valueOf(Calculator.conversion(request)); } catch (UnsupportedEncodingException e) { request = "解析错误请求"; response = "解析错误请求"; System.err.println(this.getClass().getName() + ":" + e.getMessage()); } System.out.println(IOUtil.time() + "服务端收到消息: " + request); byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); System.out.println(IOUtil.time() + "服务端发送响应: " + response + ",处理耗时:" + (System.currentTimeMillis() - begin)); channel.write(writeBuffer, writeBuffer,new WriteHandler(channel)); } @Override public void failed(Throwable e, ByteBuffer attachment) { System.err.println(this.getClass().getName() + ":" + e.getMessage()); IOUtil.close(channel); } } private class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public WriteHandler(AsynchronousSocketChannel channel) { this.channel = channel; } @Override public void completed(Integer result, ByteBuffer buffer) { if (buffer.hasRemaining()) //如果没有发送完,就继续发送直到完成 channel.write(buffer, buffer, this); else{ ByteBuffer readBuffer = ByteBuffer.allocate(1024); channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable e, ByteBuffer attachment) { System.err.println(this.getClass().getName() + ":" + e.getMessage()); IOUtil.close(channel); } } private AsynchronousServerSocketChannel serverChannel; public AioServer(final int port) { try { serverChannel = AsynchronousServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(port)); serverChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true); System.out.println("启动服务端口:" + port); } catch (final IOException e) { System.err.println(this.getClass().getName() + ":" + e.getMessage()); } } @Override public void run() { CountDownLatch latch = new CountDownLatch(1); serverChannel.accept(this, new AcceptHandler()); try { /** * 利用CountDownLatch将线程阻塞住。 * 因为AIO是异步的,此处如果不阻塞的话,线程会很快执行完毕,并会关闭通道 */ latch.await(); } catch (InterruptedException e) { IOUtil.close(serverChannel); System.out.println(IOUtil.time() + "服务关闭"); } } /** * @测试 */ public static void main(String[] args) throws InterruptedException { AioServer server = new AioServer(8080); server.start(); Thread.sleep(10000); server.interrupt(); } }
测试结果:先开启服务,接着开客户端发送请求,10秒后关闭服务。
客户端成功连接到服务器... 2017-10-09 01:16:26 699 客户端发送请求:8-6 2017-10-09 01:16:28 655 客户端收到响应:2.0 2017-10-09 01:16:28 656 客户端发送请求:7-10 2017-10-09 01:16:30 603 客户端收到响应:-3.0 2017-10-09 01:16:30 603 客户端发送请求:4-6 2017-10-09 01:16:31 397 客户端收到响应:-2.0 2017-10-09 01:16:31 397 客户端发送请求:8+4 2017-10-09 01:16:32 141 客户端收到响应:12.0 2017-10-09 01:16:32 141 客户端发送请求:4+8 2017-10-09 01:16:32 923 客户端收到响应:12.0 2017-10-09 01:16:32 924 客户端发送请求:8-4 2017-10-09 01:16:33 998 客户端已关闭,结束发送 2017-10-09 01:16:33 997 客户端获取响应失败,com.io.Aio.callback.AioClient$ReadHandler:指定的网络名不再可用。
启动服务端口:8080 2017-10-09 01:16:28 653 服务端收到消息: 8-6 2017-10-09 01:16:28 654 服务端发送响应: 2.0,处理耗时:1952 2017-10-09 01:16:30 601 服务端收到消息: 7-10 2017-10-09 01:16:30 602 服务端发送响应: -3.0,处理耗时:1946 2017-10-09 01:16:31 396 服务端收到消息: 4-6 2017-10-09 01:16:31 396 服务端发送响应: -2.0,处理耗时:792 2017-10-09 01:16:32 140 服务端收到消息: 8+4 2017-10-09 01:16:32 141 服务端发送响应: 12.0,处理耗时:744 2017-10-09 01:16:32 923 服务端收到消息: 4+8 2017-10-09 01:16:32 923 服务端发送响应: 12.0,处理耗时:781 2017-10-09 01:16:33 098 服务关闭
代码中强行让下一次请求等上上一次响应接收完成后才允许下一次请求只是为了避免ReadPendingException或者WritePendingException,很无奈的方式。
Aio的api提供了两种方式callback和future,上面的示例是callback方式,这里再贴一个简单的future实现示例,后续有时间会把代码改得更好一点。
AIO示例:Future实现方式
package com.io.Aio.future; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.Charset; import java.util.concurrent.ExecutionException; import com.util.IOUtil; public class AioClient extends Thread{ private String host; private int port; private AsynchronousSocketChannel socketChannel; public AioClient(String host, int port) { this.host = host; this.port = port; } @Override public void run(){ try{ socketChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { System.out.println(e.getMessage()); return; } if (!socketChannel.isOpen()) { System.out.println("客户端启动失败"); return; } Void isConnect = null; try { isConnect = socketChannel.connect(new InetSocketAddress(host, port)).get();//阻塞 } catch (InterruptedException | ExecutionException e) { System.out.println(e.getMessage()); return; } //返回null表示连接成功 if(!(isConnect == null)){ System.out.println("连接服务失败"); }else{ System.out.println("连接服务成功"); } } public void reruest(String request) throws InterruptedException, ExecutionException{ System.out.println("发送请求:" + request); byte[] req = request.getBytes(); ByteBuffer buffer = ByteBuffer.allocate(req.length); buffer.put(req); buffer.flip(); socketChannel.write(buffer).get(); buffer = ByteBuffer.allocateDirect(1024); socketChannel.read(buffer).get(); buffer.flip(); CharBuffer decode = Charset.defaultCharset().decode(buffer); System.out.println("收到响应:" + decode.toString()); } public static void main(String[] args) throws InterruptedException { AioClient client = new AioClient("127.0.0.1", 7070); client.start(); try { client.reruest(IOUtil.buildRequest()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
package com.io.Aio.future; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.charset.Charset; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import com.util.Calculator; public class AioServer extends Thread{ private int port; private ExecutorService exec; private AsynchronousServerSocketChannel serverSocketChannel; public AioServer(int port){ this.port = port; exec = Executors.newCachedThreadPool(Executors.defaultThreadFactory()); } @Override public void run(){ try { serverSocketChannel = AsynchronousServerSocketChannel.open(); } catch (IOException e) { System.out.println(e.getMessage()); return; } if (!serverSocketChannel.isOpen()) { System.out.println("服务启动失败"); return; } try { serverSocketChannel.bind(new InetSocketAddress(port)); } catch (IOException e1) { System.out.println("服务启动失败"); return; } System.out.println("启动服务端口:" + port); while (true) { Future<AsynchronousSocketChannel> socketChannelFuture = serverSocketChannel.accept(); try { AsynchronousSocketChannel socketChannel = socketChannelFuture.get(); Callable<String> worker = new Callable<String>() { @Override public String call() throws Exception { ByteBuffer buffer = ByteBuffer.allocateDirect(1024); socketChannel.read(buffer).get(); buffer.flip(); String request = Charset.defaultCharset().decode(buffer).toString(); System.out.println("收到请求:" + request); String response = String.valueOf(Calculator.conversion(request)); byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); System.out.println("返回响应:" + response); socketChannel.write(writeBuffer); return null; } }; exec.submit(worker); } catch (InterruptedException | ExecutionException e) { System.out.println(e.getMessage()); exec.shutdownNow(); break; } } } public static void main(String[] args) { AioServer server = new AioServer(7070); server.start(); } }
总结
socket通信包含建立连接,写入请求,读取响应先后三个动作。
在Bio中三个操作是同步阻塞的,下一个动作必须等待上个动作完成才进行,相当于每次连接-请求-响应都是一个原子动作,如果有一个动作慢,后面的也只能等,线程也就只能阻塞直到异常或者流程结束。同样,服务端需要为每个连接起一个线程去同步阻塞的读取请求-写回响应-关闭IO资源。
在Nio中,将三个动作拆成了非阻塞模型,连接动作会立即返回结果,如果channel连接没成功,可以以OP_CONNECT作为key将其注册到selector上,然后在selector上进行阻塞,如果是连接完成,则可以写消息同时注册OP_READ等待响应。在服务端,可以在一个selector轮询很多客户端的连接,如果key.isReadable()则读取请求,写回响应,如果channel是isAcceptable,就注册读动作等待可读。
在Nio 2中,又改为异步回调的方式,省掉了在selector上的阻塞轮询动作。
这里只是做个简单的总结,觉得对于Nio的掌握毕竟还比较疏浅,不过会继续学习的,后续有新的心得再进行更新。
本文原创,转载请注明出处:http://www.cnblogs.com/shanhm1991/p/7505518.html
文章参阅:
1.《Netty权威指南》
2.《java并发编程实战》
3. http://codepub.cn/2016/02/26/Asynchronous-non-blocking-message-communication-framework-based-on-Java-NIO2/