这里,我将做一个简单的通信程序,分别使用三种原始的通信工具:BIO,NIO,AIO。
功能就是一个服务器,一个客户端。服务器就是处理请求,返回响应。而客户端就是连接服务器,发送请求,接收响应。
第一步:建立通信接口
import java.util.Date; public class Test { public static void main(String[] args) { new Test().test(); } private static final long time = new Date().getTime(); private boolean run = true; public void test(){ final Server server = new BioServer(){ @Override public String service(String request) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(request != null) log("收到请求内容:" + request); else log("收到一个空的请求"); return "这是一个响应信息!"; } }; final Client client = new BioClient(){ @Override public String running(String response) { if(response != null) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } log("收到响应内容:" + response); } return "这是一个请求信息!"; } }; // 模拟请求与响应 new Thread(){ public void run(){ String request = null; String response = null; while(run) { request = client.running(response); if(run) response = server.service(request); } } }.start(); // 3秒后停止运行 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } run = false; } // ================================================== // base // ================================================== public abstract class Log { public String getName(){ return toString(); } public void log(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents); } } public abstract class Server extends Log { public String getName(){ return "服务器"; } /** * 处理请求,并返回需要发送的响应信息 */ public abstract String service(String request); } public abstract class Client extends Log { public String getName(){ return "客户端"; } /** * 接收响应,并返回需要发送的请求信息 */ public abstract String running(String response); } // ================================================== // BIO // ================================================== public abstract class BioServer extends Server { } public abstract class BioClient extends Client { } // ================================================== // NIO // ================================================== public abstract class NioServer extends Server { } public abstract class NioClient extends Client { } // ================================================== // AIO // ================================================== public abstract class AioServer extends Server { } public abstract class AioClient extends Client { } }
Server抽象类,定义了一个处理请求,并返回响应的方法service(...)。Client抽象类,定义了一个发送请求,并接收响应的方法running(...)。
这两个方法实现,在应用层。这里的应用层,指的是实际应用的场景,方法test()就是一个应用的场所,这里做了一个简单的应用。
分别为Server和Clinet建立BIO,NIO和AIO的继承类,后续就是如果实现他们。
运行第一步建立的结果如下:
服务器 -> 505: 服务器 -> 收到请求内容:这是一个请求信息! 客户端 -> 1005: 客户端 -> 收到响应内容:这是一个响应信息! 服务器 -> 1508: 服务器 -> 收到请求内容:这是一个请求信息! 客户端 -> 2010: 客户端 -> 收到响应内容:这是一个响应信息! 服务器 -> 2511: 服务器 -> 收到请求内容:这是一个请求信息! 客户端 -> 3013: 客户端 -> 收到响应内容:这是一个响应信息!
每次输出分两行,一行是时间,从0开始的时间,接着一行就是内容。
第二步:建立App运行场景
import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) { new Test().test(); } private static final long time = new Date().getTime(); public void test(){ final Server server = new BioServer(){ @Override public String service(String request) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(request != null) log("收到请求内容:" + request); else log("收到一个空的请求"); return "这是一个响应信息!"; } }; final Client client = new BioClient(){ @Override public String running(String response) { if(response != null) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } log("收到响应内容:" + response); } return "这是一个请求信息!"; } }; // 模拟App server.start(); client.start(); // 3秒后停止运行 try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } client.stop(); server.stop(); client.waitStop(); server.waitStop(); } // ================================================== // base // ================================================== public abstract class Log { public String getName(){ return toString(); } public final void log(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents); } } public abstract class App extends Log { protected boolean run = false; private final AtomicBoolean running = new AtomicBoolean(false); protected abstract void run(); public final void start(){ if(run || running.get()) return ; run = true; new Thread(){ public void run() { try { log("启动"); running.compareAndSet(false, true); App.this.run(); } finally { running.compareAndSet(true, false); log("已停止"); } } }.start(); } public final void stop() { if(!run) return ; log("停止开始!"); run = false; } public final void waitStop() { if(run) return ; while(running.get()){ log("等待停止中……"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } public abstract class Server extends App { public String getName(){ return "服务器"; } /** * 处理请求,并返回需要发送的响应信息 */ public abstract String service(String request); } public abstract class Client extends App { public String getName(){ return "客户端"; } /** * 接收响应,并返回需要发送的请求信息 */ public abstract String running(String response); } // ================================================== // BIO // ================================================== public abstract class BioServer extends Server { private List<Service> services = new ArrayList<Service>(); private AtomicInteger nextServiceNameNum = new AtomicInteger(1); protected void run(){ try { while(run){ log("接收客户端连接中……"); // 注:这里不使用线程池,使用笨重的方法。 log("接收到一个客户端连接"); Service service = new Service(); services.add(service); service.start(); // 只接收一个客户端 while(run){ try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } finally { for(Service service : services) service.stop(); for(Service service : services) service.waitStop(); services.clear(); } } private class Service extends App { private final String name; public Service() { name = BioServer.this.getName() + "/" + "服务" + nextServiceNameNum.getAndIncrement(); } public final String getName(){ return name; } @Override public void run() { String request = null; String response = null; while(run) { // TODO 接收请求 request = "未实现接收请求"; // 应用处理 response = service(request); // TODO 发送响应 } } } } public abstract class BioClient extends Client { private boolean connected = false; public void run(){ try { while(connect() && running()){ // 关闭连接后,将重新连接 closeConnect(); } } finally { // 退出运行时,关闭连接 closeConnect(); } } /** * 连接,并返回是否继续运行。 */ private boolean connect(){ log("连接服务器中……"); while(run){ // TODO 连接服务器 log("连接服务器成功!"); connected = true; return true; } log("连接服务器中止!"); return run; } /** * 关闭连接 */ private void closeConnect(){ if(!connected) return ; connected = false; } /** * 运行,并返回是否重新连接。 */ private boolean running(){ String request = null; String response = null; while(run) { // TODO 应用运行 request = running(response); // 发送请求 // TODO 接收响应 response = "未实现接收响应"; } return run; } } // ================================================== // NIO // ================================================== public abstract class NioServer extends Server { protected void run(){ } } public abstract class NioClient extends Client { public void run(){ } } // ================================================== // AIO // ================================================== public abstract class AioServer extends Server { protected void run(){ } } public abstract class AioClient extends Client { public void run(){ } } }
服务器与客户端,实际是两个不同的App,因此Server与Client都继承于App类。
App主要负责启动与停止两个操作,运行结果如下:
服务器 -> 0: 服务器 -> 启动 客户端 -> 0: 客户端 -> 启动 服务器 -> 0: 服务器 -> 接收客户端连接中…… 服务器 -> 0: 服务器 -> 接收到一个客户端连接 客户端 -> 0: 客户端 -> 连接服务器中…… 客户端 -> 0: 客户端 -> 连接服务器成功! 服务器/服务1 -> 0: 服务器/服务1 -> 启动 服务器 -> 500: 服务器 -> 收到请求内容:未实现接收请求 客户端 -> 500: 客户端 -> 收到响应内容:未实现接收响应 客户端 -> 1000: 客户端 -> 收到响应内容:未实现接收响应 服务器 -> 1000: 服务器 -> 收到请求内容:未实现接收请求 服务器 -> 1501: 服务器 -> 收到请求内容:未实现接收请求 客户端 -> 1501: 客户端 -> 收到响应内容:未实现接收响应 客户端 -> 2001: 客户端 -> 收到响应内容:未实现接收响应 服务器 -> 2001: 服务器 -> 收到请求内容:未实现接收请求 客户端 -> 2502: 客户端 -> 收到响应内容:未实现接收响应 服务器 -> 2502: 服务器 -> 收到请求内容:未实现接收请求 服务器 -> 3002: 服务器 -> 收到请求内容:未实现接收请求 客户端 -> 3002: 客户端 -> 停止开始! 客户端 -> 3002: 客户端 -> 收到响应内容:未实现接收响应 服务器 -> 3002: 服务器 -> 停止开始! 服务器 -> 3002: 服务器 -> 等待停止中…… 客户端 -> 3002: 客户端 -> 已停止 服务器/服务1 -> 3012: 服务器/服务1 -> 停止开始! 服务器/服务1 -> 3012: 服务器/服务1 -> 等待停止中…… 服务器 -> 3202: 服务器 -> 等待停止中…… 服务器/服务1 -> 3213: 服务器/服务1 -> 等待停止中…… 服务器 -> 3402: 服务器 -> 等待停止中…… 服务器/服务1 -> 3413: 服务器/服务1 -> 等待停止中…… 服务器 -> 3502: 服务器 -> 收到请求内容:未实现接收请求 服务器/服务1 -> 3502: 服务器/服务1 -> 已停止 服务器 -> 3606: 服务器 -> 等待停止中…… 服务器 -> 3616: 服务器 -> 已停止
第三步:实现BIO
个人感觉BIO是最好写的,但同时它占用线程是比较多。
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) throws InterruptedException { new Test().test(); } private static final long time = new Date().getTime(); public void test() throws InterruptedException{ final Server server = new BioServer(){ @Override public String service(String request) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(request == null) // system error? return "???"; if(request.contains("你好")) return "你好!你是?"; if(request.contains("我是")) return "哈哈!我记得了!"; return "不想跟你说话"; } }; final BioClient client1 = new BioClient(){ public String getName(){ return "客户端A"; } @Override public String running(String response) { if(response == null) return "你好!"; try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(response.contains("你是")) return "我是客户端A!你不得了吗?"; if(response.contains("我记得")) return "记得就好!上次说好要请我吃饭的,什么时候……"; return "不想跟你说话!"; } }; final BioClient client2 = new BioClient(){ public String getName(){ return "客户端B"; } @Override public String running(String response) { if(response == null) return "你好!"; try { Thread.sleep(700); } catch (InterruptedException e) { e.printStackTrace(); } if(response.contains("你是")) return "我是客户端B!你不得了吗?"; if(response.contains("我记得")) return "出门忘带钱包了,能不能先错我点?"; return "不想跟你说话!"; } }; // 模拟App server.start(); client1.start(); client2.start(); // 3秒后停止运行 Thread.sleep(3000); client1.stop(); client2.stop(); server.stop(); client1.waitStop(); client2.waitStop(); server.waitStop(); } // ================================================== // base // ================================================== public abstract class Log { public String getName(){ return toString(); } public final void log(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents); } public final void logError(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents + "!!!!!!!!!!!!!!!!!!!!!!!!!!"); } } public interface App { public abstract void start(); public abstract void stop(); public abstract void waitStop(); } public abstract class AbstractApp extends Log implements App { protected boolean run = false; private final AtomicBoolean running = new AtomicBoolean(false); protected abstract void run(); public final void start(){ if(run || running.get()) return ; run = true; new Thread(){ public void run() { try { log("启动"); running.compareAndSet(false, true); AbstractApp.this.run(); } finally { running.compareAndSet(true, false); log("已停止"); } } }.start(); } protected void stopImpl(){ } public final void stop() { if(!run) return ; if(running.get()) log("停止开始!"); run = false; stopImpl(); } public final void waitStop() { if(run) return ; while(running.get()){ log("等待停止中……"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } public abstract class Server extends AbstractApp { public String getName(){ return "服务器"; } /** * 处理请求,并返回需要发送的响应信息 */ public abstract String service(String request); } public interface Client extends App { /** * 接收响应,并返回需要发送的请求信息 */ public abstract String running(String response); } public class Session { protected static final int RECEIVE_MAX_LEN = 100; private ByteBuffer receiveContents = ByteBuffer.allocate(RECEIVE_MAX_LEN); private boolean receiveOutMaxLen = false; protected final String onReceive(ByteBuffer buffer){ while(buffer.hasRemaining()){ byte b = buffer.get(); if(b == 0){ if(!receiveOutMaxLen){ receiveContents.flip(); String result = new String(receiveContents.array(), 0, receiveContents.limit(), Charset.defaultCharset()); receiveContents.clear(); return result; } else { return "error.outLen"; } } else if(receiveOutMaxLen) { // 处理接收数据的长度超出限制的情况,这里处理方式为一直接收,无时间限制。 // if(!run) // return null; } else if(receiveContents.hasRemaining()) { receiveContents.put(b); } else { receiveOutMaxLen = true; receiveContents.clear(); } } return null; } } // ================================================== // BIO // ================================================== public abstract class BioSocketApp extends AbstractApp { public BioSocketApp(){ this(null); } public BioSocketApp(Socket socket) { this.socket = socket; } protected Socket socket = null; private ByteBuffer buffer = ByteBuffer.allocate(Session.RECEIVE_MAX_LEN); private Session session = new Session(); protected String receive(String target){ log("正在接收" + target + "……"); // 解析 <-> 读取 while(run){ // 解析 buffer.flip(); try { String contents = session.onReceive(buffer); if(contents != null) return contents; } finally { buffer.compact(); } while(run) { int count = -1; while(run){ try { count = socket.getInputStream().read(buffer.array()); break; } catch (IOException e) { logError(e.getMessage()); // e.printStackTrace(); } } if(count > 0 ){ buffer.position(buffer.position() + count); break ; } if(count == -1){ return "disconnect"; } } } return null; } protected final boolean send(String target, String response){ log("发送" + target + ":" + response + "!"); ByteBuffer buffer = ByteBuffer.wrap((response + (char)0).getBytes()); log("正在发送" + target + "……"); try { socket.getOutputStream().write(buffer.array(), buffer.position(), buffer.limit()); } catch (IOException e) { log(e.getMessage()); // e.printStackTrace(); return run; } log("发送" + target + "成功!"); return run; } } public abstract class BioServer extends Server { private List<Service> services = new ArrayList<Service>(); private AtomicInteger nextServiceNameNum = new AtomicInteger(1); protected void run(){ ServerSocket acceptor = null; try { acceptor = new ServerSocket(); acceptor.bind(new InetSocketAddress("localhost", 8080)); acceptor.setSoTimeout(1000); while(run){ log("接收客户端连接中……"); Socket socket = null; try { socket = acceptor.accept(); } catch (SocketTimeoutException e) { continue ; } catch (IOException e) { logError(e.getMessage()); // e.printStackTrace(); socket.close(); continue; } if(!run) break; // 注:这里不使用线程池,使用笨重的方法。 log("接收到一个客户端连接"); Service service = new Service(socket); services.add(service); service.start(); } } catch (IOException e) { logError(e.getMessage()); // e.printStackTrace(); } finally { if(acceptor == null) return ; for(Service service : services) service.stop(); for(Service service : services) service.waitStop(); services.clear(); try { acceptor.close(); } catch (IOException e) { e.printStackTrace(); } } } private class Service extends BioSocketApp { private final String name; public Service(Socket socket) { super(socket); name = BioServer.this.getName() + "/" + "服务" + nextServiceNameNum.getAndIncrement(); } public final String getName(){ return name; } @Override public void run() { try { socket.setSoTimeout(1000); while(run) { String request = receive("客户端请求"); if(request != null) { if(request.equals("disconnect")) break; if(request.startsWith("error")) { if(send("响应", "请求格式不正确!")) continue; break; } log("接收到客户端请求:" + request); String response = service(request); if(response != null && !send("响应", response)) break; } } } catch (IOException e) { e.printStackTrace(); } finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } log("与客户端断开连接!"); } } } } public abstract class BioClient extends BioSocketApp implements Client { public BioClient() { } private boolean connected = false; public String getName(){ return "Bio客户端"; } public void run(){ try { // 连接 -> 运行 while(connect() && running()){ // 关闭连接后,将重新连接 closeConnect(); } } finally { // 退出运行时,关闭连接 closeConnect(); } } /** * 连接,并返回是否继续运行。 */ private boolean connect(){ if(socket != null) throw new RuntimeException("socket not closed!"); log("正在连接服务器中……"); InetSocketAddress address = new InetSocketAddress("localhost", 8080); while(run){ try { socket = new Socket(); socket.connect(address); socket.setSoTimeout(1000); connected = true; log("连接服务器成功!"); return true; } catch (IOException e) { logError(e.getMessage()); // e.printStackTrace(); closeConnect(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return false; } /** * 关闭连接 */ private void closeConnect(){ if(socket != null){ if(connected) { connected = false; log("与服务器断开连接!"); } try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } /** * 运行,并返回是否重新连接。 */ private boolean running(){ String response = null; // 循环请求 while(run){ // 处理响应,获取请求 String request = running(response); // 退出运行 if(request == null) break ; // 清空响应 response = null; // 发送请求 if(send("请求", request)){ // 接收响应 response = receive("服务器响应"); if(response != null){ if(response.equals("disconnect")) break; if(!response.startsWith("error")) log("接收到服务器响应:" + response); else log("接收服务器响应时,发生错误:" + response); } else { response = "无回应!"; } } } return run; } } // ================================================== // NIO // ================================================== // 省略……
// ================================================== // AIO // ================================================== // 省略…… }
运行结果:
服务器 -> 0: 服务器 -> 启动 客户端A -> 0: 客户端A -> 启动 客户端B -> 0: 客户端B -> 启动 客户端B -> 0: 客户端B -> 正在连接服务器中…… 客户端A -> 0: 客户端A -> 正在连接服务器中…… 服务器 -> 11: 服务器 -> 接收客户端连接中…… 客户端B -> 11: 客户端B -> 连接服务器成功! 客户端B -> 11: 客户端B -> 发送请求:你好!! 客户端B -> 11: 客户端B -> 正在发送请求…… 服务器 -> 11: 服务器 -> 接收到一个客户端连接 客户端A -> 11: 客户端A -> 连接服务器成功! 客户端A -> 11: 客户端A -> 发送请求:你好!! 客户端A -> 11: 客户端A -> 正在发送请求…… 客户端A -> 11: 客户端A -> 发送请求成功! 客户端B -> 11: 客户端B -> 发送请求成功! 客户端A -> 11: 客户端A -> 正在接收服务器响应…… 客户端B -> 11: 客户端B -> 正在接收服务器响应…… 服务器 -> 11: 服务器 -> 接收客户端连接中…… 服务器/服务1 -> 11: 服务器/服务1 -> 启动 服务器 -> 11: 服务器 -> 接收到一个客户端连接 服务器/服务1 -> 11: 服务器/服务1 -> 正在接收客户端请求…… 服务器 -> 11: 服务器 -> 接收客户端连接中…… 服务器/服务1 -> 11: 服务器/服务1 -> 接收到客户端请求:你好! 服务器/服务2 -> 11: 服务器/服务2 -> 启动 服务器/服务2 -> 11: 服务器/服务2 -> 正在接收客户端请求…… 服务器/服务2 -> 11: 服务器/服务2 -> 接收到客户端请求:你好! 服务器/服务1 -> 517: 服务器/服务1 -> 发送响应:你好!你是?! 服务器/服务2 -> 517: 服务器/服务2 -> 发送响应:你好!你是?! 服务器/服务1 -> 517: 服务器/服务1 -> 正在发送响应…… 服务器/服务2 -> 517: 服务器/服务2 -> 正在发送响应…… 服务器/服务2 -> 517: 服务器/服务2 -> 发送响应成功! 服务器/服务2 -> 517: 服务器/服务2 -> 正在接收客户端请求…… 服务器/服务1 -> 517: 服务器/服务1 -> 发送响应成功! 客户端B -> 517: 客户端B -> 接收到服务器响应:你好!你是? 客户端A -> 517: 客户端A -> 接收到服务器响应:你好!你是? 服务器/服务1 -> 517: 服务器/服务1 -> 正在接收客户端请求…… 客户端A -> 1020: 客户端A -> 发送请求:我是客户端A!你不得了吗?! 服务器 -> 1020: 服务器 -> 接收客户端连接中…… 客户端A -> 1020: 客户端A -> 正在发送请求…… 客户端A -> 1020: 客户端A -> 发送请求成功! 客户端A -> 1020: 客户端A -> 正在接收服务器响应…… 服务器/服务1 -> 1020: 服务器/服务1 -> 接收到客户端请求:我是客户端A!你不得了吗? 客户端B -> 1222: 客户端B -> 发送请求:我是客户端B!你不得了吗?! 客户端B -> 1222: 客户端B -> 正在发送请求…… 客户端B -> 1222: 客户端B -> 发送请求成功! 客户端B -> 1222: 客户端B -> 正在接收服务器响应…… 服务器/服务2 -> 1222: 服务器/服务2 -> 接收到客户端请求:我是客户端B!你不得了吗? 服务器/服务1 -> 1522: 服务器/服务1 -> 发送响应:哈哈!我记得了!! 服务器/服务1 -> 1522: 服务器/服务1 -> 正在发送响应…… 服务器/服务1 -> 1522: 服务器/服务1 -> 发送响应成功! 服务器/服务1 -> 1522: 服务器/服务1 -> 正在接收客户端请求…… 客户端A -> 1522: 客户端A -> 接收到服务器响应:哈哈!我记得了! 服务器/服务2 -> 1725: 服务器/服务2 -> 发送响应:哈哈!我记得了!! 服务器/服务2 -> 1725: 服务器/服务2 -> 正在发送响应…… 服务器/服务2 -> 1725: 服务器/服务2 -> 发送响应成功! 服务器/服务2 -> 1725: 服务器/服务2 -> 正在接收客户端请求…… 客户端B -> 1725: 客户端B -> 接收到服务器响应:哈哈!我记得了! 服务器 -> 2020: 服务器 -> 接收客户端连接中…… 客户端A -> 2022: 客户端A -> 发送请求:记得就好!上次说好要请我吃饭的,什么时候……! 客户端A -> 2022: 客户端A -> 正在发送请求…… 客户端A -> 2022: 客户端A -> 发送请求成功! 客户端A -> 2022: 客户端A -> 正在接收服务器响应…… 服务器/服务1 -> 2022: 服务器/服务1 -> 接收到客户端请求:记得就好!上次说好要请我吃饭的,什么时候…… 客户端B -> 2427: 客户端B -> 发送请求:出门忘带钱包了,能不能先错我点?! 客户端B -> 2427: 客户端B -> 正在发送请求…… 客户端B -> 2427: 客户端B -> 发送请求成功! 客户端B -> 2427: 客户端B -> 正在接收服务器响应…… 服务器/服务2 -> 2427: 服务器/服务2 -> 接收到客户端请求:出门忘带钱包了,能不能先错我点? 服务器/服务1 -> 2527: 服务器/服务1 -> 发送响应:不想跟你说话! 服务器/服务1 -> 2527: 服务器/服务1 -> 正在发送响应…… 服务器/服务1 -> 2527: 服务器/服务1 -> 发送响应成功! 服务器/服务1 -> 2527: 服务器/服务1 -> 正在接收客户端请求…… 客户端A -> 2527: 客户端A -> 接收到服务器响应:不想跟你说话 服务器/服务2 -> 2927: 服务器/服务2 -> 发送响应:不想跟你说话! 服务器/服务2 -> 2927: 服务器/服务2 -> 正在发送响应…… 服务器/服务2 -> 2927: 服务器/服务2 -> 发送响应成功! 客户端B -> 2927: 客户端B -> 接收到服务器响应:不想跟你说话 服务器/服务2 -> 2927: 服务器/服务2 -> 正在接收客户端请求…… 客户端A -> 3007: 客户端A -> 停止开始! 客户端B -> 3007: 客户端B -> 停止开始! 服务器 -> 3007: 服务器 -> 停止开始! 客户端A -> 3007: 客户端A -> 等待停止中…… 客户端A -> 3027: 客户端A -> 发送请求:不想跟你说话!! 客户端A -> 3027: 客户端A -> 正在发送请求…… 服务器/服务1 -> 3027: 服务器/服务1 -> 停止开始! 服务器/服务2 -> 3027: 服务器/服务2 -> 停止开始! 客户端A -> 3027: 客户端A -> 发送请求成功! 服务器/服务1 -> 3027: 服务器/服务1 -> 等待停止中…… 客户端A -> 3027: 客户端A -> 与服务器断开连接! 服务器/服务1 -> 3027: 服务器/服务1 -> 与客户端断开连接! 服务器/服务1 -> 3027: 服务器/服务1 -> 已停止 客户端A -> 3027: 客户端A -> 已停止 客户端B -> 3208: 客户端B -> 等待停止中…… 服务器/服务2 -> 3228: 服务器/服务2 -> 等待停止中…… 客户端B -> 3408: 客户端B -> 等待停止中…… 服务器/服务2 -> 3428: 服务器/服务2 -> 等待停止中…… 客户端B -> 3608: 客户端B -> 等待停止中…… 服务器/服务2 -> 3628: 服务器/服务2 -> 等待停止中…… 客户端B -> 3628: 客户端B -> 发送请求:不想跟你说话!! 客户端B -> 3628: 客户端B -> 正在发送请求…… 客户端B -> 3628: 客户端B -> 发送请求成功! 客户端B -> 3628: 客户端B -> 与服务器断开连接! 客户端B -> 3628: 客户端B -> 已停止 服务器/服务2 -> 3628: 服务器/服务2 -> 与客户端断开连接! 服务器/服务2 -> 3628: 服务器/服务2 -> 已停止 服务器 -> 3808: 服务器 -> 等待停止中…… 服务器 -> 3828: 服务器 -> 已停止
第四步:实现NIO
NIO如果不设置成阻塞,其实跟BIO一样。触发selector事件,客户端除了需要注册,连接,还要在触发事件时使用方法finishConnect()。另外,Selector的select()方法会阻塞,阻塞的时候,是不允许注册的,反则注册方法也会被阻塞,即死锁,解决的方法就是使用wakeup()方法,唤醒Selector线程,让Selector进入一个同步阻塞,再执行异步注册。
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Date; import java.util.Iterator; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) throws InterruptedException { new Test().test(); } private static final long time = new Date().getTime(); public void test() throws InterruptedException{ final Server server = new NioServer(){ @Override public String service(String request) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(request == null) // system error? return "???"; if(request.contains("你好")) return "你好!你是?"; if(request.contains("我是")) return "哈哈!我记得了!"; return "不想跟你说话"; } }; final Client client1 = new NioClient(){ public String getName(){ return "客户端A"; } @Override public String running(String response) { if(response == null) return "你好!"; try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(response.contains("你是")) return "我是客户端A!你不记得了吗?"; if(response.contains("我记得")) return "记得就好!上次说好要请我吃饭的,什么时候……"; return "不想跟你说话!"; } }; final Client client2 = new NioClient(){ public String getName(){ return "客户端B"; } @Override public String running(String response) { if(response == null) return "你好!"; try { Thread.sleep(700); } catch (InterruptedException e) { e.printStackTrace(); } if(response.contains("你是")) return "我是客户端B!你不记得了吗?"; if(response.contains("我记得")) return "出门忘带钱包了,能不能先错我点?"; return "不想跟你说话!"; } }; // 模拟App server.start(); client1.start(); //client2.start(); // 2秒后停止运行 Thread.sleep(2000); client1.stop(); // 3秒后停止运行 Thread.sleep(3000); client2.stop(); server.stop(); client1.waitStop(); client2.waitStop(); server.waitStop(); } // ================================================== // base // ================================================== public abstract class Log { public String getName(){ return toString(); } public final void log(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents); } public final void logError(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents + "!!!!!!!!!!!!!!!!!!!!!!!!!!"); } } public interface App { public abstract void start(); public abstract void stop(); public abstract void waitStop(); } public abstract class AbstractApp extends Log implements App { protected boolean run = false; private final AtomicBoolean running = new AtomicBoolean(false); protected abstract void run(); public final void start(){ if(run || running.get()) return ; run = true; new Thread(){ public void run() { try { log("启动"); running.compareAndSet(false, true); AbstractApp.this.run(); } finally { running.compareAndSet(true, false); log("已停止"); } } }.start(); } protected void stopImpl(){ } public final void stop() { if(!run) return ; if(running.get()) log("停止开始!"); run = false; stopImpl(); } public final void waitStop() { if(run) return ; while(running.get()){ log("等待停止中……"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } public abstract class Server extends AbstractApp { public String getName(){ return "服务器"; } /** * 处理请求,并返回需要发送的响应信息 */ public abstract String service(String request); } public interface Client extends App { /** * 接收响应,并返回需要发送的请求信息 */ public abstract String running(String response); } public class Session { protected static final int RECEIVE_MAX_LEN = 100; private ByteBuffer receiveContents = ByteBuffer.allocate(RECEIVE_MAX_LEN); private boolean receiveOutMaxLen = false; protected final String onReceive(ByteBuffer buffer){ while(buffer.hasRemaining()){ byte b = buffer.get(); if(b == 0){ if(!receiveOutMaxLen){ receiveContents.flip(); String result = new String(receiveContents.array(), 0, receiveContents.limit(), Charset.defaultCharset()); receiveContents.clear(); return result; } else { return "error.outLen"; } } else if(receiveOutMaxLen) { // 处理接收数据的长度超出限制的情况,这里处理方式为一直接收,无时间限制。 // if(!run) // return null; } else if(receiveContents.hasRemaining()) { receiveContents.put(b); } else { receiveOutMaxLen = true; receiveContents.clear(); } } return null; } } // ================================================== // BIO // ================================================== // 省略
// ================================================== // NIO // ================================================== public class NioSession extends Session { private String contents = null; private int id; public NioSession() { } public NioSession(int id) { this.id = id; } public int getId(){ return id; } public String getContents(){ return contents; } public void setContents(String contents) { this.contents = contents; } } public abstract class NioSocketApp extends AbstractApp { private ByteBuffer buffer = ByteBuffer.allocate(NioSession.RECEIVE_MAX_LEN); protected final String receive(String target, SelectionKey key){ SocketChannel socket = (SocketChannel) key.channel(); NioSession session = (NioSession) key.attachment(); int count = -1; try { count = socket.read(buffer); } catch (IOException e) { logError(e.getMessage()); // e.printStackTrace(); } if(count == -1){ return "disconnect"; } if(count > 0){ buffer.flip(); try { String contents = session.onReceive(buffer); if(contents != null) { session.setContents(contents); return contents; } } finally { buffer.compact(); } } return null; } protected final boolean send(String target, SelectionKey key){ SocketChannel socket = (SocketChannel) key.channel(); NioSession session = (NioSession) key.attachment(); String response = session.getContents(); log("发送" + target + ":" + response + "!"); ByteBuffer buffer = ByteBuffer.wrap((response + (char)0).getBytes()); log("正在发送" + target + "……"); try { socket.write(buffer); } catch (IOException e) { log(e.getMessage()); return run; } log("发送" + target + "成功!"); return run; } protected void closeSocket(SelectionKey key){ try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } log("与客户端断开连接!"); } } public abstract class NioServer extends Server { private Selector acceptorSelector = null; private AtomicInteger nextServiceNameNum = new AtomicInteger(1); protected void run(){ ServerSocketChannel acceptor = null; Service service = null; try { try { acceptor = ServerSocketChannel.open(); acceptor.bind(new InetSocketAddress("localhost", 8080)); acceptor.configureBlocking(false); service = new Service(); service.init(); service.start(); acceptorSelector = Selector.open(); acceptor.register(acceptorSelector, SelectionKey.OP_ACCEPT); } catch (IOException e1) { e1.printStackTrace(); log("启动失败!"); return ; } while(run){ log("正在接收客户端连接中……"); try { if(acceptorSelector.select(1000) > 0){ Iterator<SelectionKey> keys = acceptorSelector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); if(key.isAcceptable()){ SocketChannel socket = acceptor.accept(); // 这里有可能获取的socket为空 // if(socket != null){ log("接收到一个客户端连接!"); service.register(socket); // } } keys.remove(); } } } catch (IOException e) { logError(e.getMessage()); e.printStackTrace(); } } } finally { if(acceptor != null){ if(acceptorSelector != null) { try { acceptorSelector.close(); } catch (IOException e1) { e1.printStackTrace(); } } acceptorSelector = null; if(service != null){ service.stop(); service.waitStop(); } try { acceptor.close(); } catch (IOException e) { e.printStackTrace(); } } log("已停止!"); } } protected void stopImpl() { Selector selector = this.acceptorSelector; if(selector != null){ selector.wakeup(); } } private class Service extends NioSocketApp{ private Selector selector; private Object blockLock = null; private int serviceId = 0; public String getName(){ return NioServer.this.getName() + "/" + "服务" + (serviceId > 0 ? serviceId : ""); } public void init() throws IOException{ if(selector == null) selector = Selector.open(); } @Override protected void run() { try { Object blockLock = null; while(run){ try { if(selector.select() > 0){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); keys.remove(); NioSession session = (NioSession) key.attachment(); serviceId = session.getId(); if(key.isReadable()){ String request = receive("客户端请求", key); if(request == null) continue ; // 不会发生!NIO的读取数据都是就绪,也就是说总会读取到数据。 if(request.equals("disconnect")){ closeSocket(key); continue ; } if(request.startsWith("error")) { session.setContents("请求格式不正确!"); } else { String response = service(request); if(response == null){ closeSocket(key); continue; } session.setContents(response); } key.interestOps(SelectionKey.OP_WRITE); } if(key.isWritable()){ if(!send("响应", key)){ closeSocket(key); continue; } key.interestOps(SelectionKey.OP_READ); } serviceId = 0; } } } catch (IOException e) { e.printStackTrace(); } if(this.blockLock != null){ blockLock = this.blockLock; if(blockLock != null){ synchronized(blockLock){ // 不操作,这里同步,为了让register方法有机会同步块。 } } } } } finally { if(selector != null){ for(SelectionKey key : selector.keys()){ NioSession session = (NioSession) key.attachment(); serviceId = session.getId(); closeSocket(key); serviceId = 0; } try { selector.close(); } catch (IOException e) { e.printStackTrace(); } selector = null; } } } public void register(SocketChannel socket) { try { blockLock = socket.blockingLock(); synchronized(blockLock){ socket.configureBlocking(false); // 这里要唤醒,因为register和selector是互斥锁方法。 selector.wakeup(); SelectionKey key = socket.register(selector, SelectionKey.OP_READ); key.attach(new NioSession(nextServiceNameNum.getAndIncrement())); blockLock = null; } } catch (IOException e) { e.printStackTrace(); try { socket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } protected void stopImpl() { Selector selector = this.selector; if(selector != null){ selector.wakeup(); } } } } public abstract class NioClient extends NioSocketApp implements Client { public NioClient() { } private Selector selector = null; private SocketChannel socket = null; private boolean connected = false; public String getName(){ return "Nio客户端"; } public void run(){ try { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); return ; } // 连接 -> 运行 while(connect() && running()){ // 关闭连接后,将重新连接 closeConnect(); } } finally { // 退出运行时,关闭连接 closeConnect(); if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } private boolean connect(){ if(socket != null) throw new RuntimeException("connect is not closed"); log("正在连接服务器中……"); while(run){ try { socket = SocketChannel.open(); socket.configureBlocking(false); socket.connect(new InetSocketAddress("localhost", 8080)); socket.register(selector, SelectionKey.OP_CONNECT); return run; } catch (IOException e) { e.printStackTrace(); closeConnect(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return false; } private void closeConnect(){ if(socket != null){ if(connected) { connected = false; log("与服务器断开连接!"); } try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } private boolean running(){ while(run){ try { if(selector.select() > 0){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); keys.remove(); if(key.isConnectable()){ socket.finishConnect(); key.attach(new NioSession()); key.interestOps(SelectionKey.OP_WRITE); log("连接服务器成功!"); } NioSession session = (NioSession) key.attachment(); if(key.isWritable()){ String request = running(session.getContents()); session.setContents(request); if(!send("响应", key)){ closeSocket(key); continue; } key.interestOps(SelectionKey.OP_READ); } if(key.isReadable()){ String response = receive("服务器响应", key); if(response == null) continue ; // 不会发生!NIO的读取数据都是就绪,也就是说总会读取到数据。 if(response.equals("disconnect")){ closeSocket(key); continue ; } if(response.startsWith("error")) { log("接收服务器响应时,发生错误:" + response); } else log("接收到服务器响应:" + response); key.interestOps(SelectionKey.OP_WRITE); } } } } catch (IOException e) { e.printStackTrace(); } } return true; } protected void stopImpl() { Selector selector = this.selector; if(selector != null){ selector.wakeup(); } } } // ================================================== // AIO // ================================================== // 省略…… }
第五步:实现AIO
AIO本来可以很简单的实现的,但因为考滤用最少的线程实线,就变得复杂了点。虽然它没有BIO那么简单,但它灵活性高,不阻塞,跟NIO相比的话,AIO各方面都比较方便一点。
import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketTimeoutException; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Date; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; public class Test { public static void main(String[] args) throws InterruptedException { new Test().test(); } private static final long time = new Date().getTime(); public void test() throws InterruptedException{ final Server server = new AioServer(){ @Override public String service(String request) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(request == null) // system error? return "???"; if(request.contains("你好")) return "你好!你是?"; if(request.contains("我是")) return "哈哈!我记得了!"; return "不想跟你说话"; } }; final Client client1 = new AioClient(){ public String getName(){ return "客户端A"; } @Override public String running(String response) { if(response == null) return "你好!"; try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } if(response.contains("你是")) return "============我是客户端A!你不记得了吗?==========="; if(response.contains("我记得")) return "记得就好!上次说好要请我吃饭的,什么时候……"; return "不想跟你说话!"; } }; final Client client2 = new AioClient(){ public String getName(){ return "客户端B"; } @Override public String running(String response) { if(response == null) return "你好!"; try { Thread.sleep(700); } catch (InterruptedException e) { e.printStackTrace(); } if(response.contains("你是")) return "==========我是客户端B!你不记得了吗?=========="; if(response.contains("我记得")) return "出门忘带钱包了,能不能先错我点?"; return "不想跟你说话!"; } }; // 模拟App server.start(); client1.start(); client2.start(); // 2秒后停止运行 Thread.sleep(2000); client1.stop(); // 3秒后停止运行 Thread.sleep(3000); client2.stop(); server.stop(); client1.waitStop(); client2.waitStop(); server.waitStop(); } // ================================================== // base // ================================================== public abstract class Log { public String getName(){ return toString(); } public final void log(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents); } public final void logError(String contents){ System.out.println(getName() + " -> " + (new Date().getTime() - time) + ": " + getName() + " -> " + contents + "!!!!!!!!!!!!!!!!!!!!!!!!!!"); } } public interface App { public abstract void start(); public abstract void stop(); public abstract void waitStop(); } public abstract class AbstractApp extends Log implements App { protected boolean run = false; private final AtomicBoolean running = new AtomicBoolean(false); protected abstract void run(); public final void start(){ if(run || running.get()) return ; run = true; new Thread(){ public void run() { try { log("启动"); running.compareAndSet(false, true); AbstractApp.this.run(); } finally { running.compareAndSet(true, false); log("已停止"); } } }.start(); } protected void stopImpl(){ } public final void stop() { if(!run) return ; if(running.get()) log("停止开始!"); run = false; stopImpl(); } public final void waitStop() { if(run) return ; while(running.get()){ log("等待停止中……"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } public abstract class Server extends AbstractApp { public String getName(){ return "服务器"; } /** * 处理请求,并返回需要发送的响应信息 */ public abstract String service(String request); } public interface Client extends App { /** * 接收响应,并返回需要发送的请求信息 */ public abstract String running(String response); } public class Session { protected static final int RECEIVE_MAX_LEN = 100; private ByteBuffer receiveContents = ByteBuffer.allocate(RECEIVE_MAX_LEN); private boolean receiveOutMaxLen = false; protected final String onReceive(ByteBuffer buffer){ while(buffer.hasRemaining()){ byte b = buffer.get(); if(b == 0){ if(!receiveOutMaxLen){ receiveContents.flip(); String result = new String(receiveContents.array(), 0, receiveContents.limit(), Charset.defaultCharset()); receiveContents.clear(); return result; } else { return "error.outLen"; } } else if(receiveOutMaxLen) { // 处理接收数据的长度超出限制的情况,这里处理方式为一直接收,无时间限制。 // if(!run) // return null; } else if(receiveContents.hasRemaining()) { receiveContents.put(b); } else { receiveOutMaxLen = true; receiveContents.clear(); } } return null; } } // ================================================== // BIO // ================================================== // 省略……
// ================================================== // NIO // ================================================== // 省略……
// ================================================== // AIO // ================================================== public class AioSession extends Session { private int id; private ByteBuffer buffer = ByteBuffer.allocate(AioSession.RECEIVE_MAX_LEN); private AsynchronousSocketChannel socket; private Future<Integer> future; private boolean read = true; public AioSession(AsynchronousSocketChannel socket) { this.socket = socket; } public AioSession(int id, AsynchronousSocketChannel socket) { this.id = id; this.socket = socket; } public int getId(){ return id; } public ByteBuffer getBuffer(){ return buffer; } public AsynchronousSocketChannel getSocket(){ return socket; } public Future<Integer> getFuture(){ return future; } public void setFuture(Future<Integer> future, boolean read){ this.future = future; this.read = read; } public boolean isRead() { return read; } } public abstract class AioSocketApp extends AbstractApp { protected final String receive(String target, AioSession session){ AsynchronousSocketChannel socket = session.getSocket(); ByteBuffer buffer = session.getBuffer(); Future<Integer> future = session.getFuture(); int count = -1; if(future != null){ try { count = future.get(); } catch (InterruptedException | ExecutionException e) { logError(e.getMessage()); // e.printStackTrace(); } if(count == -1){ return "disconnect"; } } if(future == null || count > 0){ if(count > 0){ buffer.flip(); try { String contents = session.onReceive(buffer); if(contents != null) return contents; } finally { buffer.compact(); } } future = socket.read(buffer); session.setFuture(future, true); } return null; } protected final boolean send(String target, String contents, AioSession session){ AsynchronousSocketChannel socket = session.getSocket(); log("发送" + target + ":" + contents + "!"); ByteBuffer buffer = ByteBuffer.wrap((contents + (char)0).getBytes()); log("正在发送" + target + "……"); session.setFuture(socket.write(buffer), false); return run; } } public abstract class AioServer extends Server { private AtomicInteger nextServiceNameNum = new AtomicInteger(1); protected void run(){ Service service = new Service(); AsynchronousServerSocketChannel acceptor = null; try { acceptor = AsynchronousServerSocketChannel.open(); acceptor.bind(new InetSocketAddress("localhost", 8080)); service = new Service(); service.start(); while(run){ Future<AsynchronousSocketChannel> future = null; try { while(run){ log("正在接收客户端连接……"); if(future == null) future = acceptor.accept(); if(future.isDone()){ log("接收到一个客户端的连接!"); AsynchronousSocketChannel socket = null; try { socket = future.get(); service.add(socket); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); socket.close(); } future = null; continue ; } if(future.isCancelled()){ log("接收客户端连接已中止!"); future = null; continue ; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { if(future != null && !future.cancel(false)){ while(!(future.isDone() || future.isCancelled())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } catch (IOException e){ e.printStackTrace(); } finally { if(acceptor != null){ if(service != null){ service.stop(); service.waitStop(); service.clear(); } try { acceptor.close(); } catch (IOException e) { e.printStackTrace(); } } } } private class Service extends AioSocketApp { private BlockingQueue<AsynchronousSocketChannel> sockets = new ArrayBlockingQueue<AsynchronousSocketChannel>(100); private Set<AioSession> sessions = new HashSet<AioSession>(); private int serviceId = 0; public String getName(){ return AioServer.this.getName() + "/" + "服务" + (serviceId > 0 ? serviceId : ""); } @Override protected void run() { AsynchronousSocketChannel socket = null; while(run){ while(run && !sockets.isEmpty()) { try { socket = sockets.poll(1, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); continue; } if(socket == null) continue; AioSession session = new AioSession(nextServiceNameNum.getAndIncrement(), socket); sessions.add(session); receive(session); } if(sessions.size() > 0){ Iterator<AioSession> sessions = new ArrayList<AioSession>(this.sessions).iterator(); while(run && sessions.hasNext()){ AioSession session = sessions.next(); serviceId = session.getId(); if(session.isRead()) receive(session); else send(session); serviceId = 0; } } }; } private void receive(AioSession session){ receive(session, false); } private void receive(AioSession session, boolean reset){ if(reset) session.setFuture(null, true); Future<Integer> future = session.getFuture(); if(future == null || future.isDone()){ String request = receive("客户端请求", session); if(request == null) return ; if(request.equals("disconnect")){ closeSocket(session); return ; } String response = null; if(request.startsWith("error")) { response = "请求格式不正确!"; } else { log("接收到客户端请求:" + request); response = service(request); if(response == null){ closeSocket(session); return ; } } if(!send("响应", response, session)) closeSocket(session); } else if(future.isCancelled()){ closeSocket(session); log("接收客户端请求已中止!"); } } private void send(AioSession session){ Future<Integer> future = session.getFuture(); if(future.isDone()){ log("发送响应成功!"); // 重置future receive(session, true); } else if(future.isCancelled()){ closeSocket(session); log("发送响应已中止!"); } } protected void closeSocket(AioSession session) { Future<Integer> future = session.getFuture(); if(future != null && !future.cancel(false)){ while(!(future.isDone() || future.isCancelled())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } session.setFuture(null, true); } try { session.getSocket().close(); } catch (IOException e) { e.printStackTrace(); } sessions.remove(session); log("与客户端断开连接!"); } public void add(AsynchronousSocketChannel socket) throws InterruptedException { sockets.put(socket); } public void clear(){ while(!sockets.isEmpty()){ AsynchronousSocketChannel socket = null; try { socket = sockets.poll(1, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } if(sessions.size() > 0){ Iterator<AioSession> sessions = new ArrayList<AioSession>(this.sessions).iterator(); while(sessions.hasNext()){ AioSession session = sessions.next(); closeSocket(session); } } } } } public abstract class AioClient extends AioSocketApp implements Client { public AioClient() { } public String getName(){ return "Aio客户端"; } AsynchronousSocketChannel socket = null; AioSession session = null; public void run(){ try { while(connect() && running()){ closeConnect(); } } finally { closeConnect(); } } private boolean connect(){ while(run){ Future<Void> future = null; try { try { socket = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); closeConnect(); return false; } future = socket.connect(new InetSocketAddress("localhost", 8080)); while(run){ if(future.isDone()){ session = new AioSession(socket); log("连接服务器成功!"); return true; } if(future.isCancelled()){ log("连接服务器终止!"); } } } finally { if(future != null && !future.cancel(false)){ while(!(future.isCancelled() || future.isDone())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } } return false; } private void closeConnect(){ if(session != null){ Future<Integer> future = session.getFuture(); if(future != null){ while(!(future.isCancelled() || future.isDone())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } session = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } private boolean running(){ String response = null; String request = running(response); // 循环请求 while(run){ // 退出运行 if(request == null) break ; // 清空响应 response = null; // 发送请求 if(send("请求", request, session)){ Future<Integer> future = session.getFuture(); while(run){ if(future.isDone()){ log("发送请求成功!"); break; } if(future.isCancelled()){ log("发送请求已中止!"); return run; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // 接收响应 future = null; while(run){ if(future == null || future.isDone()){ response = receive("服务器响应", session); if(response == null) { future = session.getFuture(); continue ; } if(response.equals("disconnect")) return run; if(response.startsWith("error")) { request = "请求格式不正确!"; } else { log("接收到服务器响应:" + response); request = running(response); } break ; } else if(future.isCancelled()) { log("接收客户端响应已中止!"); return true; } } } } return run; } } }