Java中的BIO、NIO、AIO-3
这一篇是代码篇,敲代码有助于理解记忆这些抽象的东西:
参考资料:
- http://www.blogjava.net/killme2008/archive/2012/09/17/295743.html Java AIO初探(异步网络IO)
- https://www.ibm.com/developerworks/cn/java/j-lo-nio2/index.html 在 Java 7 中体会 NIO.2 异步执行的快乐
- https://blog.csdn.net/anxpp/article/details/51512200#t3 Java 网络IO编程总结(BIO、NIO、AIO均含完整实例代码)
目录
Java BIO代码
服务器
- package sock;
-
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.ServerSocket;
- import java.net.Socket;
- import java.util.ArrayList;
- import java.util.LinkedList;
- import java.util.List;
-
-
-
- /*
- 存在多线程对数据结构并发操作不安全的问题
- */
- public class socketServerT extends ServerSocket{
- private static final int port = 2018;
- private static boolean isPrint = false;
- private static List<String> user_list = new ArrayList();
- private static List<ServerThread> threadlist = new ArrayList<>();//应该使用线程安全的集合
- private static LinkedList<String> message = new LinkedList<>();
-
-
- socketServerT() throws IOException{
- super(port);
- new PrintOutThread();
- System.out.println( " server is created");
- try{
- while(true){
- Socket sock = this.accept();
- new ServerThread(sock);
- }
- }catch( Exception e){
- e.printStackTrace();
- }
- }
- class PrintOutThread extends Thread{
- PrintOutThread(){
- System.out.println(getName() + "!!!!!");
- start();
- }
- public void run(){
- while(true){
- if(isPrint){
- String m = message.getFirst();
- for(ServerThread i : threadlist){
- sendMessage(i,m);
- }
- message.removeFirst();
- isPrint = message.size()>0 ? true:false;
- }
- }
- }
- }
- class ServerThread extends Thread{
- private BufferedReader rec;
- private PrintWriter send;
- private Socket client;
- private String name;
-
- ServerThread(Socket sock) throws IOException{
- client = sock;
- rec = new BufferedReader(new InputStreamReader(client.getInputStream()));
- send = new PrintWriter(client.getOutputStream(),true);
- //rec.readLine();
- System.out.println(getName() + "is created");
- send.println("connected to chat room, please input your name!!");
- start();
- }
- public PrintWriter getSend(){
- return send;
- }
- public void run(){
- try{
- int flag = 0;
- String line = "";
- while(!line.contains("bye")){
- line = rec.readLine();
- if("showuser".equals(line)){
- send.println(listOneUsers());
- //line = rec.readLine();
- continue;
- }
- if(flag == 0){
- flag ++;
- name = line;
- user_list.add(name);
- threadlist.add(this);
- send.println(name + " begin to chat");
- pushMessage("client <" + name+"> enter chat room");
- }else{
- pushMessage("client <" + name+"> say :" + line);
- }
- //line = rec.readLine();
- }
-
- }catch (Exception e){
- e.printStackTrace();
- }finally {
- try{
- client.close();
- rec.close();
- send.close();
- }catch (IOException e){
- e.printStackTrace();
- }
- threadlist.remove(this);
- user_list.remove(name);
- pushMessage("client <" + name+"> exit");
- }
-
- }
- }
- public void pushMessage(String mess){
- message.add(mess);
- isPrint = true;
- }
- public String listOneUsers(){
- StringBuffer s = new StringBuffer();
- s.append("---online users---
");
- for( String i:user_list){
- s.append(i + "
");
- }
- s.append("---end---
");
- return s.toString();
- }
- public void sendMessage(ServerThread s,String m){
- //System.out.println("test");
- PrintWriter p = s.getSend();
- p.println(m);
- //p.flush();
- }
- public static void main(String args[]){
- try{
- socketServerT s = new socketServerT();
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
-
客户端
- package sock;
-
- import java.io.BufferedReader;
- import java.io.IOException;
- import java.io.InputStreamReader;
- import java.io.PrintWriter;
- import java.net.Socket;
-
- public class socketClientT extends Socket{
- private static final String server = "127.0.0.1";
- private static final int port = 2018;
-
- private Socket sock;
- private PrintWriter send;
- private BufferedReader rec;
- socketClientT() throws IOException{
- super(server,port);
- sock = this;
- send = new PrintWriter(sock.getOutputStream(),true);
- rec = new BufferedReader(new InputStreamReader(sock.getInputStream()));
- Thread t = new recvThread();
- BufferedReader sysBuff = new BufferedReader(new InputStreamReader(System.in));
- String line = "";
- while(! line.contains("bye")){
- line = sysBuff.readLine();
- send.println(line);
- }
- send.close();
- rec.close();
- sysBuff.close();
- this.close();
- }
- class recvThread extends Thread{
- private BufferedReader buff;
- recvThread(){
- try {
- buff = new BufferedReader(new InputStreamReader(sock.getInputStream()));
- start();
- } catch (Exception e){
- e.printStackTrace();
- }
- }
- public void run(){
- String res = "";
- try{
- while(true){
- res = buff.readLine();
- if(res.contains("bye"))
- break;
- System.out.println(res);
- }
- send.close();
- buff.close();
- sock.close();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
- public static void main(String args[]){
- try {
- socketClientT s = new socketClientT();
- }catch (Exception e){
- e.printStackTrace();
- }
- }
- }
-
Java NIO
JDK 1.4的java.util.*;
包中引入了新的Java I/O库,其目的是提高IO操作的速度。
简介
NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。
NIO提供了与传统BIO模型中的Socket和ServerSocket相对应的SocketChannel和ServerSocketChannel两种不同的套接字通道实现。
新增的着两种通道都支持阻塞和非阻塞两种模式。
阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。
对于低负载、低并发的应用程序,可以使用同步阻塞I/O来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用NIO的非阻塞模式来开发。
下面会先对基础知识进行介绍。
缓冲区Buffer
Buffer是一个对象,包含一些要写入或者读出的数据。
在NIO库中,所有数据都是用缓冲区处理的。在读取数据时,它是直接读到缓冲区中的;在写入数据时,也是写入到缓冲区中。任何时候访问NIO中的数据,都是通过缓冲区进行操作。
缓冲区实际上是一个数组,并提供了对数据结构化访问以及维护读写位置等信息。
具体的缓存区有这些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他们实现了相同的接口:Buffer。
通道Channel
我们对数据的读取和写入要通过Channel,它就像水管一样,是一个通道。通道不同于流的地方就是通道是双向的,可以用于读、写和同时读写操作。
底层的操作系统的通道一般都是全双工的,所以全双工的Channel比流能更好的映射底层操作系统的API。
Channel主要分两大类:
- SelectableChannel:用户网络读写
- FileChannel:用于文件操作
后面代码会涉及的ServerSocketChannel和SocketChannel都是SelectableChannel的子类。
多路复用器
Selector是Java NIO 编程的基础。
Selector提供选择已经就绪的任务的能力:Selector会不断轮询注册在其上的Channel,如果某个Channel上面发生读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。
一个Selector可以同时轮询多个Channel,因为JDK使用了epoll()代替传统的select实现,所以没有最大连接句柄1024/2048的限制。所以,只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。
服务器端代码
server:
- /**
- * server
- */
- public class server {
-
- private static int port = 8000;
- private static serverHandle sHandle;
-
- public static void start() {
- start(port);
- }
- private static synchronized void start(int port) {
- if (sHandle != null) {
- sHandle.setStarted(false);
- }
- sHandle = new serverHandle(port);
- new Thread(sHandle,"server").start();
-
- }
- public static void main(String[] args) {
- start();
- }
- }
serverHandle
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.net.ServerSocket;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
-
- //import jdk.internal.org.objectweb.asm.Handle;
-
- /**
- * serverHandle
- */
- public class serverHandle implements Runnable{
-
- private ServerSocketChannel serverChannel;
- private Selector selector;
- private volatile boolean started;//各个线程都能看到状态
-
- public serverHandle(int port) {
- try {
- //创建选择器
- selector = Selector.open();
- //创建serverSocketChannel
- serverChannel = ServerSocketChannel.open();
- //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
- serverChannel.configureBlocking(false);
- //创建InetSocketAddress
- InetSocketAddress socketAddress = new InetSocketAddress(port);
- //得到ServerSocket
- ServerSocket serverSocket = serverChannel.socket();
- //绑定ServetSocket到一个具体的端口,并设置backlog
- serverSocket.bind(socketAddress, 1024);
- //向selector注册ServerSocketChannel,设置为监听客户端的连接请求
- serverChannel.register(selector, SelectionKey.OP_ACCEPT);
- //标记服务器状态
- started = true;
- System.out.println("服务器已经启动,端口号:" + port);
-
- } catch (IOException e) {
- //TODO: handle exception
- e.printStackTrace();
- System.exit(1);
- }
- }
-
- public void setStarted(boolean flag) {
- this.started = flag;
- }
-
- public void run() {
- //循环遍历selector
- while (started) {
- try {
- //无论是否有读写事件,selector每个1s唤醒一次
-
- try {
- selector.select(1000);
- } catch (Exception e) {
- e.printStackTrace();
- }
- //获得状态为ready的selectorkey
- Set<SelectionKey> keys = selector.selectedKeys();
- Iterator<SelectionKey> iter = keys.iterator();
- SelectionKey key = null;
- while (iter.hasNext()) {
- key = iter.next();
- iter.remove();
- try {
- handle(key);
- } catch (Exception e) {
-
- if (key != null) {
- key.cancel();
- if (key.channel() != null) {
- key.channel().close();
- }
- }
- }
- }
- } catch (Throwable t) {
-
- t.printStackTrace();
- }
- }
- //关闭selector
- if (selector != null) {
- try {
- selector.close();
- } catch (Exception e) {
-
- e.printStackTrace();
- }
- }
- }
- private void handle(SelectionKey key) throws IOException {
- //判断kei是否是有效的
- if (key.isValid()) {
-
- //处理新接入的请求消息,
- if (key.isAcceptable()) {
- //通过selectionkey得到ServerSocketChannel,注意转型
- ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
- //通过serversocketchannel的accept方法创建SocketChannel实例
- SocketChannel client = serverSocketChannel.accept();
- //设置client为非阻塞模式
- client.configureBlocking(false);
- //把client注册到selector,注册事件为读
- client.register(selector, SelectionKey.OP_READ);
- }
-
- //读消息
- if (key.isReadable()) {
- SocketChannel sc = (SocketChannel) key.channel();
- //创建byteBuffer,大小为1M
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- //读取请求码流,返回读取到的字节数
- int readBytes = sc.read(buffer);
- //读取到字节,对字节进行编码
- if (readBytes > 0) {
- //将缓冲区buffer的position设为0,用于后续对缓冲区的读操作
- buffer.flip();
- //根据缓冲区可读字节数创建字节数组
- byte[] bytes = new byte[buffer.remaining()];
- //将缓冲区可读字节数组复制到新建的数组中
- buffer.get(bytes);
- String expresString = new String(bytes, "UTF-8");
- System.out.println("服务器收到的消息:" + expresString);
-
- //处理数据
- String res = null;
- res = new StringBuffer(expresString).reverse().toString();
- //写入返回消息
- dowrite(sc,res);
- } else if (readBytes < 0) {
- //链路关闭释放资源
- key.cancel();
- sc.close();
- }
- }
- }
- }
- private void dowrite(SocketChannel sc, String res) throws IOException {
- //把字符串编码为字节数组
- byte[] bytes = res.getBytes();
- //根据数组容量创建ByteBuffer
- ByteBuffer wBuffer = ByteBuffer.allocate(bytes.length);
- //把字节数组复制到buffer中
- wBuffer.put(bytes);
- //flip操作,更改position为0,方便后续的写操作从头开始
- wBuffer.flip();
- //发送缓冲区的数据
- sc.write(wBuffer);
- }
- }
客户端代码
client
- import java.util.Scanner;
-
- /**
- * clientChannel
- */
- public class clientChannel {
- private static String host = "127.0.0.1";
- private static int port = 8000;
- private static clientHandle cHandle;
-
- public static void start() {
- start(host,port);
- }
- public static synchronized void start(String host, int port) {
- if (cHandle != null) {
- cHandle.stop();
- }
- cHandle = new clientHandle(host, port);
- new Thread(cHandle, "client").start();;
- }
-
- public static Boolean sendMsg(String msg) throws Exception {
- if (msg.contains("q")) {
- return false;
- }
- cHandle.sendMsg(msg);
- return true;
- }
- public static void main(String[] args) {
- try {
- start();
- Scanner s = new Scanner(System.in);
- String tmp;
- while ((tmp = s.nextLine())!= null) {
- sendMsg(tmp);
- }
- } catch (Exception e) {
- //TODO: handle exception
- e.printStackTrace();
- }
- //start();
- }
- }
clientHandle
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.SocketChannel;
- import java.util.Iterator;
- import java.util.Set;
- import java.io.IOException;
- /**
- * clientHandle
- */
- public class clientHandle implements Runnable{
-
- private String host;
- private int port;
- private Selector selector;
- private SocketChannel socketChannel;
- private volatile boolean started;
-
- public clientHandle(String ip, int port) {
- this.host = ip;
- this.port = port;
-
- try {
- //创建选择器
- this.selector = Selector.open();
- //创建socketchannel
- this.socketChannel = SocketChannel.open();
- //配置socketChannel为非阻塞模式
- this.socketChannel.configureBlocking(false);
- this.started = true;
-
- } catch (Exception e) {
- //TODO: handle exception
- e.printStackTrace();
- System.exit(1);
- }
-
- }
-
- public void stop(){
- this.started = false;
- }
-
- public void doConnection() throws Exception{
- InetSocketAddress address = new InetSocketAddress(this.host, this.port);
- if (socketChannel.connect(address)) {
- System.out.println("连接服务器成功!!!");
- } else {
- System.out.println("未连接成功,下一轮继续!!!");
- //向selector注册socketChannel的连接操作
- socketChannel.register(selector, SelectionKey.OP_CONNECT);
- }
- }
-
- public void handleInput(SelectionKey key) throws Exception{
- if (key.isValid()) {
- //获取SeclectionKey对应的socketChannel
- SocketChannel sc = (SocketChannel) key.channel();
- //测试key对应的socketChannel通道是否已完成或未能完成其套接连接操作
- if (key.isConnectable()) {
- //完成连接过程,返回true表示channel已经建立了连接,false表示建立连接失败
- //当使用SocketChannel的connect()函数进行连接的时候,当处于非阻塞模式的情况下,可能连接不是立刻完成的,需要使用
- //finidhConnect()来检查连接是否建立
- if (sc.finishConnect()) {
- } else {
- System.exit(1);
- }
- }
- //读消息,判断key对应的channel是否可读
- if (key.isReadable()) {
- //创建一个缓冲区,用来存读取的数据,
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- //读取数据,返回读取的字节数
- int readSize = sc.read(buffer);
- //读取到字节,对字节进行编码
- if (readSize > 0) {
- //设置缓冲区的limit和position,方面后面读取数据
- buffer.flip();
- //根据缓冲区的可读字节数创建字节数组
- byte[] bytes = new byte[buffer.remaining()];
- //把缓冲区的内容复制到字节数组中去
- buffer.get(bytes);
- String res = new String(bytes, "UTF-8");
- System.out.println("客户端收到的数据为:" + res);
- } else if (readSize < 0) {
- //这种情况说明链路已经关闭,释放资源
- key.cancel();
- sc.close();
- }
- }
- }
- }
-
- public void doWrite(SocketChannel sc, String request) throws Exception {
- //将数据转换为字节数组
- byte[] bytes = request.getBytes();
- //创建字节缓冲区
- ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- //将字节数组放入字节缓冲区
- buffer.put(bytes);
- //flip操作,调整limit和position
- buffer.flip();
- //将数据写入到channel中
- sc.write(buffer);
- }
- public void sendMsg(String msg) throws Exception{
- //这里还没明白为什么要先注册读操作?需要注册读操作才能,知道读状态是否就绪,方便handelInput函数处理!!
- //但是还有一个疑问,什么时候使用OP_WRITE
- socketChannel.register(selector, SelectionKey.OP_READ);
- doWrite(socketChannel, msg);
- }
-
- public void run() {
- try{
- doConnection();
- }catch(Exception e){
- e.printStackTrace();
- System.exit(1);
- }
- //循环遍历selector
- while(started){
- try{
- //无论是否有读写事件发生,selector每隔1s被唤醒一次
- selector.select(1000);
- //阻塞,只有当至少一个注册的事件发生的时候才会继续.
- // selector.select();
- Set<SelectionKey> keys = selector.selectedKeys();
- Iterator<SelectionKey> it = keys.iterator();
- SelectionKey key = null;
- while(it.hasNext()){
- key = it.next();
- it.remove();
- try{
- handleInput(key);
- }catch(Exception e){
- if(key != null){
- key.cancel();
- if(key.channel() != null){
- key.channel().close();
- }
- }
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- System.exit(1);
- }
- }
- //selector关闭后会自动释放里面管理的资源
- if(selector != null)
- try{
- selector.close();
- }catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
##测试及解析
测试代码:
- import java.util.Scanner;
-
- /**
- * Test
- */
- public class Test {
-
- public static void main(String[] args) {
- server.start();
- try {
- Thread.sleep(3000);
- } catch (Exception e) {
- //TODO: handle exception
- e.printStackTrace();
- }
- clientChannel.start();
- Scanner s = new Scanner(System.in);
- String tmp;
- try {
- while ((tmp = s.nextLine()) != null) {
- clientChannel.sendMsg(tmp);
- }
- } catch (Exception e) {
- //TODO: handle exception
- e.printStackTrace();
- }
-
- }
-
- }
可以看到,创建NIO服务端的主要步骤如下:
- 打开ServerSocketChannel,监听客户端连接
- 绑定监听端口,设置连接为非阻塞模式
- 创建Reactor线程,创建多路复用器并启动线程
- 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
- Selector轮询准备就绪的key
- Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
- 设置客户端链路为非阻塞模式
- 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
- 异步读取客户端消息到缓冲区
- 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
- 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
Java AIO
Java nio 2.0的主要改进就是引入了异步IO(包括文件和网络),这里主要介绍下异步网络IO API的使用以及框架的设计,以TCP服务端为例。首先看下为了支持AIO引入的新的类和接口:
** java.nio.channels.AsynchronousChannel**
标记一个channel支持异步IO操作。
** java.nio.channels.AsynchronousServerSocketChannel**
ServerSocket的aio版本,创建TCP服务端,绑定地址,监听端口等。
** java.nio.channels.AsynchronousSocketChannel**
面向流的异步socket channel,表示一个连接。
** java.nio.channels.AsynchronousChannelGroup**
异步channel的分组管理,目的是为了资源共享。一个AsynchronousChannelGroup
绑定一个线程池,这个线程池执行两个任务:处理IO事件和派发CompletionHandler
。AsynchronousServerSocketChannel
创建的时候可以传入一个AsynchronousChannelGroup
,那么通过AsynchronousServerSocketChannel
创建的AsynchronousSocketChannel
将同属于一个组,共享资。
** java.nio.channels.CompletionHandler**
异步IO操作结果的回调接口,用于定义在IO操作完成后所作的回调工作。AIO的API允许两种方式来处理异步操作的结果:返回的Future
模式或者注册CompletionHandler
,推荐用CompletionHandler
的方式,这些handler的调用是由AsynchronousChannelGroup
的线程池派发的。显然,线程池的大小是性能的关键因素。AsynchronousChannelGroup
允许绑定不同的线程池,通过三个静态方法来创建:
public static AsynchronousChannelGroup withFixedThreadPool(int nThreads, ThreadFactory threadFactory) throws IOException
public static AsynchronousChannelGroup withCachedThreadPool(ExecutorService executor, int initialSize)
public static AsynchronousChannelGroup withThreadPool(ExecutorService executor) throws IOException
需要根据具体应用相应调整,从框架角度出发,需要暴露这样的配置选项给用户。
在介绍完了aio引入的TCP的主要接口和类之后,我们来设想下一个aio框架应该怎么设计。参考非阻塞nio框架的设计,一般都是采用Reactor
模式,Reactor负责事件的注册、select、事件的派发;相应地,异步IO有个Proactor模式,Proactor负责CompletionHandler
的派发,查看一个典型的IO写操作的流程来看两者的区别:
Reactor: send(msg) -> 消息队列是否为空,如果为空 -> 向Reactor注册OP_WRITE,然后返回 -> Reactor select -> 触发Writable,通知用户线程去处理 ->先注销Writable(很多人遇到的cpu 100%的问题就在于没有注销),处理Writeable,如果没有完全写入,继续注册OP_WRITE。注意到,写入的工作还是用户线程在处理。
Proactor: send(msg) -> 消息队列是否为空,如果为空,发起read异步调用,并注册CompletionHandler,然后返回。 -> 操作系统负责将你的消息写入,并返回结果(写入的字节数)给Proactor -> Proactor派发CompletionHandler。可见,写入的工作是操作系统在处理,无需用户线程参与。事实上在aio的API中,AsynchronousChannelGroup就扮演了Proactor的角色。
CompletionHandler有三个方法,分别对应于处理成功、失败、被取消(通过返回的Future)情况下的回调处理:
public interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
void cancelled(A attachment);
}
其中的泛型参数V表示IO调用的结果,而A是发起调用时传入的attchment。
server端代码
server
- //package aio;
-
- public class Server {
- public static int clientCount = 0;
- public static int port = 8000;
- public static String hoString = "127.0.0.1";
-
- public static void start() {
- start(Server.port);
- }
- public static void start(int port) {
- AsyncServerHandler serverHandler = new AsyncServerHandler(port);
- Thread t1 = new Thread(serverHandler);
- t1.start();
- }
- public static void main(String[] args) {
- start();
- }
- }
-
AsyncServerHandler
- //package aio;
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.channels.AsynchronousServerSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class AsyncServerHandler implements Runnable{
- private AsynchronousServerSocketChannel serverSocketChannel;
- private CountDownLatch latch;
- public AsyncServerHandler(int port) {
- // TODO Auto-generated constructor stub
- InetSocketAddress address = new InetSocketAddress(port);
- try {
- serverSocketChannel = AsynchronousServerSocketChannel.open();
- serverSocketChannel.bind(address);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- System.out.println("服务器已经启动,端口号:" + port);
- }
-
-
- public void run() {
- // TODO Auto-generated method stub
- //CountDownLatch初始化
- //它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
- //此处,让现场在此阻塞,防止服务端执行完成后退出
- //也可以使用while(true)+sleep
- //生成环境就不需要担心这个问题,以为服务端是不会退出的
- this.latch = new CountDownLatch(1);
- serverSocketChannel.accept(this, new AcceptHandler(this.latch));
-
- try {
- latch.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- public AsynchronousServerSocketChannel getServerSocketChannel() {
- return serverSocketChannel;
- }
-
- public void setServerSocketChannel(AsynchronousServerSocketChannel serverSocketChannel) {
- this.serverSocketChannel = serverSocketChannel;
- }
-
- public CountDownLatch getLatch() {
- return latch;
- }
-
- public void setLatch(CountDownLatch latch) {
- this.latch = latch;
- }
-
- }
-
AcceptHandler
- //package aio;
-
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousServerSocketChannel;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,AsyncServerHandler> {
-
- private CountDownLatch latch;
- public AcceptHandler(CountDownLatch latch) {
- // TODO Auto-generated constructor stub
- this.latch = latch;
- }
-
- public void completed(AsynchronousSocketChannel socketChannel, AsyncServerHandler serverHandler) {
- // TODO Auto-generated method stub
- //进入这个函数说明说明事件处理成功,已经成功的拿到socketChanne
- Server.clientCount ++;
- System.out.println("当前连接的客户数:" + Server.clientCount);
- //继续接受其他客户机的连接,
- AsynchronousServerSocketChannel channel = serverHandler.getServerSocketChannel();
- channel.accept(serverHandler, this);
- //创建新的buffer,为读取数据做准备
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- socketChannel.read(buffer, buffer, new serverReadHandler(socketChannel));
-
- }
-
-
- public void failed(Throwable exc, AsyncServerHandler attachment) {
- // TODO Auto-generated method stub
- exc.printStackTrace();
- this.latch.countDown();
- }
-
-
- }
-
serverReadHandler
- //package aio;
-
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
-
- public class serverReadHandler implements CompletionHandler<Integer,ByteBuffer> {
- //用于读取半包消息和应答消息
- private AsynchronousSocketChannel serverChannel;
- public serverReadHandler(AsynchronousSocketChannel channel) {
- // TODO Auto-generated constructor stub
- this.serverChannel = channel;
- }
-
-
-
- public void completed(Integer result, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- //操作系统读取IO就绪之后,进入这个函数
- //调整limit和position关系,方便读取
- //buffer.flip();
- if (buffer.hasRemaining()) {
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String msg = null;
- try {
- msg = new String(bytes, "UTF-8");
- } catch (UnsupportedEncodingException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- System.out.println("服务器收到消息:" + msg);
-
- String calResult = null;
- StringBuffer stringBuffer = new StringBuffer(msg);
- calResult = stringBuffer.reverse().toString();
- //向客户端发送结果
- byte[] resultBytes = calResult.getBytes();
- ByteBuffer rBuffer = ByteBuffer.allocate(resultBytes.length);
- rBuffer.put(resultBytes);
- this.serverChannel.write(rBuffer, rBuffer, new ServerWriteHandler(this.serverChannel));
- }else {
- System.out.println("服务器没有读取到数据");
- }
- }
-
-
- public void failed(Throwable exc, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- try {
- this.serverChannel.close();
- System.out.println("服务器socket关闭~~~");
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
-
serverWriteHandler
- //package aio;
-
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
-
- public class ServerWriteHandler implements CompletionHandler<Integer,ByteBuffer> {
- private AsynchronousSocketChannel serverChannel;
- public ServerWriteHandler(AsynchronousSocketChannel channel) {
- // TODO Auto-generated constructor stub
- this.serverChannel = channel;
- }
-
- public void completed(Integer result, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- //调整limit和position的位置,方便下面输出使用
- //buffer.flip();
- if (buffer.hasRemaining()) {
- System.out.println("服务器输出数据~~~");
- buffer.clear();
- //向客户端写入数据
- this.serverChannel.write(buffer, buffer, this);
- } else {
- //读取数据
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- this.serverChannel.read(readBuffer, readBuffer, new serverReadHandler(this.serverChannel));
- }
- }
-
- public void failed(Throwable exc, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- //出现异常关闭socketchannel
- try {
- this.serverChannel.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- }
-
客户端
client
- //package aio;
-
- import java.util.Scanner;
-
- public class client {
- private static String DEFAULT_HOST = "127.0.0.1";
- private static int DEFAULT_PORT = 8000;
- private static AsyncClientHandler clientHandle;
- public static void start(){
- start(DEFAULT_HOST,DEFAULT_PORT);
- }
- public static synchronized void start(String ip,int port){
- if(clientHandle!=null)
- return;
- clientHandle = new AsyncClientHandler(ip,port);
- new Thread(clientHandle,"Client").start();
- }
- //向服务器发送消息
- public static boolean sendMsg(String msg) throws Exception{
- if(msg.equals("q")) return false;
- clientHandle.sendMsg(msg);
- return true;
- }
- //@SuppressWarnings("resource")
- public static void main(String[] args) throws Exception{
- //System.out.println("请输入请求消息:");
- start();
- System.out.println("请输入请求消息:");
- Scanner scanner = new Scanner(System.in);
- String tmp = null;
- for (int i = 0; i < 10; i++) {
- tmp = scanner.nextLine();
- clientHandle.sendMsg(tmp);
- }
-
- }
- }
-
AsyncClientHandler
- //package aio;
-
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable{
- private AsynchronousSocketChannel clientChannel;
- private String host;
- private int port;
- private CountDownLatch latch;
-
- public AsyncClientHandler(String host, int port) {
- // TODO Auto-generated constructor stub
- this.host = host;
- this.port = port;
- try {
- //创建异步客户端通道
- clientChannel = AsynchronousSocketChannel.open();
- } catch (Exception e) {
- // TODO: handle exception
- e.printStackTrace();
- }
- }
-
-
-
- public void run() {
- // TODO Auto-generated method stub
- latch = new CountDownLatch(1);
- //创建InetScoketAddress
- InetSocketAddress address = new InetSocketAddress(this.host, this.port);
- //发起异步连接操作,回调参数是这个类本身,如果连接成功会回调completed方法
- clientChannel.connect(address, this, this);
- try {
- latch.await();
- } catch (InterruptedException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- try {
- clientChannel.close();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
-
- public void completed(Void result, AsyncClientHandler attachment) {
- // TODO Auto-generated method stub
- //连接服务器成功,就会调用这个函数
- System.out.println("连接服务器成功!!!!");
- }
-
-
- public void failed(Throwable exc, AsyncClientHandler attachment) {
- // TODO Auto-generated method stub
- //连接服务器失败,会调用这个函数
- System.out.println("连接服务器失败!!!");
- exc.printStackTrace();
-
- try {
- clientChannel.close();
- latch.countDown();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- //向服务器发送消息
- public void sendMsg(String msg) {
- System.out.println(msg);
- byte[] bytes = msg.getBytes();
- ByteBuffer buffer = ByteBuffer.allocate(bytes.length);
- buffer.put(bytes);
- //进行flip操作
- buffer.flip();
- //异步写
- clientChannel.write(buffer, buffer, new WriteHandler(clientChannel,latch));
-
-
- }
-
-
- }
-
-
writeHandler
- //package aio;
-
- import java.io.IOException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class WriteHandler implements CompletionHandler<Integer, ByteBuffer>{
-
- private AsynchronousSocketChannel clientChannel;
- private CountDownLatch latch;
- public WriteHandler(AsynchronousSocketChannel channel, CountDownLatch latch) {
- // TODO Auto-generated constructor stub
- this.clientChannel = channel;
- this.latch = latch;
- }
-
- public void completed(Integer result, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- System.out.println("发送数据成功!~~~");
- //进行flip操作
- //buffer.flip();
- //判断buffer中是否有需要发送的数据,如果有数据就进行发送,如果没有就进行读取操作
- if (buffer.hasRemaining()) {
- System.out.println("进入写数据!!!");
- clientChannel.write(buffer, buffer, this);
- System.out.println("发送数据成功~~~");
- }
- //读取数据
- System.out.println("进入读取数据");
- ByteBuffer readBuffer = ByteBuffer.allocate(1024);
- clientChannel.read(readBuffer, readBuffer, new ReadHandle(clientChannel,latch));
-
- }
-
-
- public void failed(Throwable exc, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- System.err.println("发送数据失败~~~");
- try {
- clientChannel.close();
- latch.countDown();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- }
-
- }
-
ReadHandle
- //package aio;
-
- import java.io.IOException;
- import java.io.UnsupportedEncodingException;
- import java.nio.ByteBuffer;
- import java.nio.channels.AsynchronousSocketChannel;
- import java.nio.channels.CompletionHandler;
- import java.util.concurrent.CountDownLatch;
-
- public class ReadHandle implements CompletionHandler<Integer,ByteBuffer> {
- private AsynchronousSocketChannel clientChannel;
- private CountDownLatch latch;
- public ReadHandle(AsynchronousSocketChannel channel, CountDownLatch latch) {
- // TODO Auto-generated constructor stub
- this.latch = latch;
- this.clientChannel = channel;
- }
-
- public void completed(Integer result, ByteBuffer buffer) {
- // TODO Auto-generated method stub
- //调整buffer的limit和position方便获取收到的数据
- //buffer.flip();
- System.out.println("读取数据成功!!!");
- byte[] bytes = new byte[buffer.remaining()];
- buffer.get(bytes);
- String res;
- try {
- res = new String(bytes, "UTF-8");
- System.out.println("收到的数据: " + res);
- } catch (UnsupportedEncodingException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
-
-
-
- }
-
- public void failed(Throwable exc, ByteBuffer attachment) {
- // TODO Auto-generated method stub
- System.err.println("读取数据失败~~~");
-
- try {
- clientChannel.close();
- this.latch.countDown();
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
-
- }
-